@@ -39,7 +39,9 @@ import (
39
39
"github.com/stretchr/testify/suite"
40
40
"go.uber.org/goleak"
41
41
"go.uber.org/zap"
42
+ "go.uber.org/zap/zapcore"
42
43
"go.uber.org/zap/zaptest"
44
+ "go.uber.org/zap/zaptest/observer"
43
45
44
46
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
45
47
s "go.uber.org/cadence/.gen/go/shared"
@@ -826,6 +828,69 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
826
828
t .NotNil (request )
827
829
}
828
830
831
+ func (t * TaskHandlersTestSuite ) TestWorkflowTask_NondeterministicLogNonexistingID () {
832
+ taskList := "taskList"
833
+ testEvents := []* s.HistoryEvent {
834
+ createTestEventWorkflowExecutionStarted (1 , & s.WorkflowExecutionStartedEventAttributes {TaskList : & s.TaskList {Name : & taskList }}),
835
+ createTestEventDecisionTaskScheduled (2 , & s.DecisionTaskScheduledEventAttributes {TaskList : & s.TaskList {Name : & taskList }}),
836
+ createTestEventDecisionTaskStarted (3 ),
837
+ createTestEventDecisionTaskCompleted (4 , & s.DecisionTaskCompletedEventAttributes {ScheduledEventId : common .Int64Ptr (2 )}),
838
+ createTestEventActivityTaskScheduled (5 , & s.ActivityTaskScheduledEventAttributes {
839
+ // Insert an ID which does not exist
840
+ ActivityId : common .StringPtr ("NotAnActivityID" ),
841
+ ActivityType : & s.ActivityType {Name : common .StringPtr ("pkg.Greeter_Activity" )},
842
+ TaskList : & s.TaskList {Name : & taskList },
843
+ }),
844
+ }
845
+
846
+ obs , logs := observer .New (zap .ErrorLevel )
847
+ logger := zap .New (obs )
848
+
849
+ task := createWorkflowTask (testEvents , 3 , "HelloWorld_Workflow" )
850
+ stopC := make (chan struct {})
851
+ params := workerExecutionParameters {
852
+ TaskList : taskList ,
853
+ WorkerOptions : WorkerOptions {
854
+ Identity : "test-id-1" ,
855
+ Logger : logger ,
856
+ },
857
+ WorkerStopChannel : stopC ,
858
+ }
859
+
860
+ taskHandler := newWorkflowTaskHandler (testDomain , params , nil , t .registry )
861
+ request , err := taskHandler .ProcessWorkflowTask (& workflowTask {task : task }, nil )
862
+
863
+ t .NotNil (request )
864
+ response := request .(* s.RespondDecisionTaskFailedRequest )
865
+
866
+ // NOTE: we might acctually want to return an error
867
+ // but since previously we checked the wrong error type, it may break existing customers workflow
868
+ // The issue is that we change the error type and that we change the error message, the customers
869
+ // are checking the error string - we plan to wrap all errors to avoid this issue in client v2
870
+ t .NoError (err )
871
+ t .NotNil (response )
872
+
873
+ // Check that the error was logged
874
+ ignoredWorkflowLogs := logs .FilterMessage ("Ignored workflow panic error" )
875
+ require .Len (t .T (), ignoredWorkflowLogs .All (), 1 )
876
+
877
+ // Find the ReplayError field
878
+ withField := ignoredWorkflowLogs .All ()[0 ]
879
+ var replayErrorField * zapcore.Field
880
+ for _ , field := range withField .Context {
881
+ if field .Key == "ReplayError" {
882
+ replayErrorField = & field
883
+ }
884
+ }
885
+ require .NotNil (t .T (), replayErrorField )
886
+ require .Equal (t .T (), zapcore .ErrorType , replayErrorField .Type )
887
+ require .ErrorContains (t .T (), replayErrorField .Interface .(error ),
888
+ "nondeterministic workflow: " +
889
+ "history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), " +
890
+ "replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)" )
891
+
892
+ }
893
+
829
894
func (t * TaskHandlersTestSuite ) TestWorkflowTask_WorkflowReturnsPanicError () {
830
895
taskList := "taskList"
831
896
testEvents := []* s.HistoryEvent {
0 commit comments