@@ -23,9 +23,11 @@ import (
2323 "go.temporal.io/api/workflowservice/v1"
2424 sdkclient "go.temporal.io/sdk/client"
2525 "go.temporal.io/sdk/workflow"
26+ "go.temporal.io/server/common/dynamicconfig"
2627 "go.temporal.io/server/common/log/tag"
2728 "go.temporal.io/server/common/payloads"
2829 "go.temporal.io/server/common/testing/protoutils"
30+ "go.temporal.io/server/common/testing/taskpoller"
2931 "go.temporal.io/server/common/testing/testvars"
3032 "go.temporal.io/server/service/history/api/resetworkflow"
3133 "go.temporal.io/server/tests/testcore"
@@ -992,6 +994,8 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflow_ResetAfterContinueAsNew() {
992994}
993995
994996func (s * ResetWorkflowTestSuite ) TestResetWorkflowWithExternalPayloads () {
997+ s .OverrideDynamicConfig (dynamicconfig .ExternalPayloadsEnabled , true )
998+
995999 // This test verifies that ExternalPayloadSize and ExternalPayloadCount are correctly
9961000 // tracked when a workflow is reset. It resets to a point before the activity completes,
9971001 // so only the workflow input external payload should be counted.
@@ -1040,7 +1044,7 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() {
10401044 // Workflow handler - schedules activity on first task, completes on second task
10411045 isFirstTaskProcessed := false
10421046 workflowComplete := false
1043- wtHandler := func (task * workflowservice.PollWorkflowTaskQueueResponse ) ([]* commandpb.Command , error ) {
1047+ wtHandler := func (_ * workflowservice.PollWorkflowTaskQueueResponse ) ([]* commandpb.Command , error ) {
10441048 if ! isFirstTaskProcessed {
10451049 isFirstTaskProcessed = true
10461050 // Schedule an activity
@@ -1072,33 +1076,29 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() {
10721076 }}, nil
10731077 }
10741078
1075- // activity handler
1076- atHandler := func (task * workflowservice.PollActivityTaskQueueResponse ) (* commonpb.Payloads , bool , error ) {
1077- return payloads .EncodeString ("Activity Result" ), false , nil
1078- }
1079- poller := & testcore.TaskPoller {
1080- Client : s .FrontendClient (),
1081- Namespace : s .Namespace ().String (),
1082- TaskQueue : & taskqueuepb.TaskQueue {Name : taskQueue , Kind : enumspb .TASK_QUEUE_KIND_NORMAL },
1083- Identity : identity ,
1084- WorkflowTaskHandler : wtHandler ,
1085- ActivityTaskHandler : atHandler ,
1086- Logger : s .Logger ,
1087- T : s .T (),
1088- }
1079+ tv := testvars .New (s .T ()).WithTaskQueue (taskQueue )
1080+ poller := taskpoller .New (s .T (), s .FrontendClient (), s .Namespace ().String ())
10891081
10901082 // Process first workflow task to schedule activities
1091- _ , err := poller .PollAndProcessWorkflowTask ()
1083+ _ , err := poller .PollAndHandleWorkflowTask (tv , func (task * workflowservice.PollWorkflowTaskQueueResponse ) (* workflowservice.RespondWorkflowTaskCompletedRequest , error ) {
1084+ cmds , err := wtHandler (task )
1085+ return & workflowservice.RespondWorkflowTaskCompletedRequest {Commands : cmds }, err
1086+ })
10921087 s .Logger .Info ("PollAndProcessWorkflowTask" , tag .Error (err ))
10931088 s .NoError (err )
10941089
10951090 // Process one activity task which also creates second workflow task
1096- err = poller .PollAndProcessActivityTask (false )
1091+ _ , err = poller .PollAndHandleActivityTask (tv , func (task * workflowservice.PollActivityTaskQueueResponse ) (* workflowservice.RespondActivityTaskCompletedRequest , error ) {
1092+ return & workflowservice.RespondActivityTaskCompletedRequest {Result : payloads .EncodeString ("Activity Result" )}, nil
1093+ })
10971094 s .Logger .Info ("Poll and process first activity" , tag .Error (err ))
10981095 s .NoError (err )
10991096
11001097 // Process second workflow task which checks activity completion
1101- _ , err = poller .PollAndProcessWorkflowTask ()
1098+ _ , err = poller .PollAndHandleWorkflowTask (tv , func (task * workflowservice.PollWorkflowTaskQueueResponse ) (* workflowservice.RespondWorkflowTaskCompletedRequest , error ) {
1099+ cmds , err := wtHandler (task )
1100+ return & workflowservice.RespondWorkflowTaskCompletedRequest {Commands : cmds }, err
1101+ })
11021102 s .Logger .Info ("Poll and process second workflow task" , tag .Error (err ))
11031103 s .NoError (err )
11041104
@@ -1123,7 +1123,7 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() {
11231123 break
11241124 }
11251125 }
1126- s .Greater (resetToEventID , int64 ( 0 ) , "Should have found first completed workflow task" )
1126+ s .Positive (resetToEventID , "Should have found first completed workflow task" )
11271127
11281128 resetResp , err := s .FrontendClient ().ResetWorkflowExecution (testcore .NewContext (), & workflowservice.ResetWorkflowExecutionRequest {
11291129 Namespace : s .Namespace ().String (),
0 commit comments