diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index 346c77e64..ce6932ec1 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -144,7 +144,7 @@ func (b *builder) integrationTest() error { HostPort: "127.0.0.1:7233", Namespace: "integration-test-namespace", }, - LogLevel: "warn", + LogLevel: "error", ExtraArgs: []string{ "--dynamic-config-value", "frontend.enableExecuteMultiOperation=true", "--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true", diff --git a/internal/internal_command_state_machine.go b/internal/internal_command_state_machine.go index c8c90bef2..40deafd37 100644 --- a/internal/internal_command_state_machine.go +++ b/internal/internal_command_state_machine.go @@ -1072,13 +1072,16 @@ func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() { } func (h *commandsHelper) getCommand(id commandID) commandStateMachine { - command, ok := h.commands[id] + listElem, ok := h.commands[id] + command := listElem.Value.(commandStateMachine) + commandID := command.getID() if !ok { - panicMsg := fmt.Sprintf("[TMPRL1100] unknown command %v, possible causes are nondeterministic workflow definition code"+ - " or incompatible change in the workflow definition", id) + panicMsg := fmt.Sprintf( + "[TMPRL1100] During replay, workflow history (event ID %s) implies that a %v command should have been emitted by the replayed code, but no such command was emitted. "+ + "Possible causes are nondeterministic workflow definition code, or an incompatible change in the workflow definition.", commandID.id, commandID.commandType) panicIllegalState(panicMsg) } - return command.Value.(commandStateMachine) + return command } func (h *commandsHelper) addCommand(command commandStateMachine) { diff --git a/test/integration_test.go b/test/integration_test.go index 8a05591b3..a1637dc47 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4591,6 +4591,8 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b return false }, 10*time.Second, 300*time.Millisecond) + ts.T().Errorf("\n\n-----------------------------------\n%v\n\n-----------------------------------\n\n", taskFailed) + // Check the task has the expected cause ts.NoError(histErr) ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, taskFailed.Cause) @@ -4598,6 +4600,50 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b ts.True(taskFailedMetric >= 1) } +func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseCommandNotFound() { + // Create a situation in which, on replay, a TimerStarted event is + // encountered and yet the code emits no corresponding command. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + wfID := "test-non-determinism-failure-cause-command-not-found-" + uuid.New() + // Client starts workflow via UpdateWithStart and waits for update response + // ("early return" pattern) + startWfOptions := ts.startWorkflowOptions(wfID) + startWfOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: ts.client.NewWithStartWorkflowOperation(startWfOptions, ts.workflows.NonDeterminismCommandNotFoundWorkflow), + UpdateOptions: client.UpdateWorkflowOptions{ + WorkflowID: wfID, + UpdateName: "wait-for-wft-completion", + WaitForStage: client.WorkflowUpdateStageCompleted, + }, + }) + ts.NoError(err) + + // WFT 1: workflow shouldStartTimer state is true, causing workflow to + // emit a StartTimer command; workflow sends update response. + ts.NoError(updHandle.Get(ctx, nil)) + // Stop worker and start a new one in order to force full history replay. + ts.worker.Stop() + nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{WorkflowPanicPolicy: internal.FailWorkflow}) + ts.registerWorkflowsAndActivities(nextWorker) + ts.NoError(nextWorker.Start()) + defer nextWorker.Stop() + // Set shouldStartTimer=false and send second update in order to trigger a + // WFT. + shouldStartTimer = false + updHandle, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ + WorkflowID: wfID, + UpdateName: "wait-for-wft-completion", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + ts.NoError(err) + // WFT 2: full replay, NDE due to missing StartTimer command. + err = updHandle.Get(ctx, nil) + ts.T().Logf("\n\n-----------------------------------\n%v\n\n-----------------------------------\n\n", err) +} + func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseReplay() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index 53aae82dd..ba87e508d 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2831,6 +2831,18 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif return } +var shouldStartTimer = true + +func (w *Workflows) NonDeterminismCommandNotFoundWorkflow(ctx workflow.Context) error { + workflow.SetUpdateHandler(ctx, "wait-for-wft-completion", func(ctx workflow.Context) error { + return nil + }) + if shouldStartTimer { + workflow.Sleep(ctx, 999*time.Hour) + } + return nil +} + func (w *Workflows) NonDeterminismReplay(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) var a Activities @@ -3471,6 +3483,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.SignalCounter) worker.RegisterWorkflow(w.PanicOnSignal) worker.RegisterWorkflow(w.ForcedNonDeterminism) + worker.RegisterWorkflow(w.NonDeterminismCommandNotFoundWorkflow) worker.RegisterWorkflow(w.NonDeterminismReplay) worker.RegisterWorkflow(w.MutableSideEffect) worker.RegisterWorkflow(w.HistoryLengths) @@ -3496,7 +3509,6 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.AwaitWithOptions) worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate) worker.RegisterWorkflow(w.WorkflowWithUpdate) - worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childWithRetryPolicy) worker.RegisterWorkflow(w.childWithCustomRetryPolicy)