diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 611394d5c..c2966ff1c 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -909,6 +909,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes task := &workflowTask{ task: response, historyIterator: historyIterator, + autoConfigHint: response.GetAutoConfigHint(), } return task } @@ -1096,7 +1097,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask } if response == nil || len(response.TaskToken) == 0 { atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1) - return nil, startTime, nil + return response, startTime, nil } return response, startTime, err @@ -1129,7 +1130,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context, scheduledToStartLatency := time.Duration(response.GetStartedTimestamp() - response.GetScheduledTimestampOfThisAttempt()) metricsScope.Timer(metrics.ActivityScheduledToStartLatency).Record(scheduledToStartLatency) - return &activityTask{task: response, pollStartTime: startTime}, nil + return &activityTask{task: response, pollStartTime: startTime, autoConfigHint: response.GetAutoConfigHint()}, nil } // PollTask polls a new task diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index ed0a4e779..233be56c3 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -189,6 +189,352 @@ func TestProcessTask_failures(t *testing.T) { }) } +func TestActivityTaskPoller_PollTask(t *testing.T) { + tests := []struct { + name string + setupPoller func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) + setupMocks func(*workflowservicetest.MockClient) + expectedResult interface{} + expectedError error + validateResult func(t *testing.T, result interface{}) + }{ + { + name: "success with valid activity task", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, false) + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForActivityTask( + gomock.Any(), + &s.PollForActivityTaskRequest{ + Domain: common.StringPtr(_testDomainName), + TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(_testTaskList)}), + Identity: common.StringPtr(_testIdentity), + TaskListMetadata: &s.TaskListMetadata{MaxTasksPerSecond: common.Float64Ptr(0.0)}, + }, + gomock.Any(), + ).Return(&s.PollForActivityTaskResponse{ + TaskToken: []byte("test-task-token"), + WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow")}, + ActivityId: common.StringPtr("test-activity"), + ActivityType: &s.ActivityType{Name: common.StringPtr("TestActivity")}, + WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")}, + ScheduledTimestampOfThisAttempt: common.Int64Ptr(time.Now().UnixNano()), + StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()), + AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, + }, nil) + }, + expectedError: nil, + validateResult: func(t *testing.T, result interface{}) { + assert.NotNil(t, result) + activityTask, ok := result.(*activityTask) + assert.True(t, ok, "result should be *activityTask") + assert.NotNil(t, activityTask.task) + assert.Equal(t, []byte("test-task-token"), activityTask.task.TaskToken) + assert.NotNil(t, activityTask.autoConfigHint) + assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint) + }, + }, + { + name: "success with empty task (no work available)", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, false) + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForActivityTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(&s.PollForActivityTaskResponse{ + TaskToken: []byte{}, // Empty task token indicates no work + AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, + }, nil) + }, + expectedError: nil, + validateResult: func(t *testing.T, result interface{}) { + assert.NotNil(t, result) + activityTask, ok := result.(*activityTask) + assert.True(t, ok, "result should be *activityTask") + assert.Nil(t, activityTask.task) + assert.NotNil(t, activityTask.autoConfigHint) + assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint) + }, + }, + { + name: "service error during poll", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, false) + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForActivityTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, &s.InternalServiceError{Message: "service unavailable"}) + }, + expectedError: &s.InternalServiceError{Message: "service unavailable"}, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "service busy error during poll", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, false) + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForActivityTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, &s.ServiceBusyError{Message: "service busy"}) + }, + expectedError: &s.ServiceBusyError{Message: "service busy"}, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "poller shutting down", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, true) // shutdown = true + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + // No mock setup needed as doPoll should return early due to shutdown + }, + expectedError: errShutdown, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "context timeout during poll", + setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + return buildActivityTaskPoller(t, false) + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForActivityTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, context.DeadlineExceeded) + }, + expectedError: context.DeadlineExceeded, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + poller, mockService := tt.setupPoller(t) + tt.setupMocks(mockService) + + result, err := poller.PollTask() + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError, err) + } else { + assert.NoError(t, err) + } + + tt.validateResult(t, result) + }) + } +} + +func buildActivityTaskPoller(t *testing.T, shutdown bool) (*activityTaskPoller, *workflowservicetest.MockClient) { + ctrl := gomock.NewController(t) + mockService := workflowservicetest.NewMockClient(ctrl) + + var shutdownC <-chan struct{} + if shutdown { + ch := make(chan struct{}) + close(ch) + shutdownC = ch + } else { + shutdownC = make(<-chan struct{}) + } + + return &activityTaskPoller{ + basePoller: basePoller{ + shutdownC: shutdownC, + }, + domain: _testDomainName, + taskListName: _testTaskList, + identity: _testIdentity, + service: mockService, + metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, + logger: testlogger.NewZap(t), + activitiesPerSecond: 0.0, + featureFlags: FeatureFlags{}, + }, mockService +} + +func TestWorkflowTaskPoller_PollTask(t *testing.T) { + tests := []struct { + name string + setupPoller func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) + setupMocks func(*workflowservicetest.MockClient) + expectedResult interface{} + expectedError error + validateResult func(t *testing.T, result interface{}) + }{ + { + name: "success with valid workflow task", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(&s.PollForDecisionTaskResponse{ + TaskToken: []byte("test-task-token"), + WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow"), RunId: common.StringPtr("test-run")}, + WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")}, + StartedEventId: common.Int64Ptr(1), + NextEventId: common.Int64Ptr(2), + ScheduledTimestamp: common.Int64Ptr(time.Now().UnixNano()), + StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()), + History: &s.History{Events: []*s.HistoryEvent{{EventId: common.Int64Ptr(1)}}}, + AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, + }, nil) + }, + expectedError: nil, + validateResult: func(t *testing.T, result interface{}) { + assert.NotNil(t, result) + workflowTask, ok := result.(*workflowTask) + assert.True(t, ok, "result should be *workflowTask") + assert.NotNil(t, workflowTask.task) + assert.Equal(t, []byte("test-task-token"), workflowTask.task.TaskToken) + assert.NotNil(t, workflowTask.historyIterator) + assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint) + }, + }, + { + name: "success with empty task (no work available)", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(&s.PollForDecisionTaskResponse{ + TaskToken: []byte{}, // Empty task token indicates no work + AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, + }, nil) + }, + expectedError: nil, + validateResult: func(t *testing.T, result interface{}) { + assert.NotNil(t, result) + workflowTask, ok := result.(*workflowTask) + assert.True(t, ok, "result should be *workflowTask") + assert.Nil(t, workflowTask.task) + assert.NotNil(t, workflowTask.autoConfigHint) + assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint) + }, + }, + { + name: "service error during poll", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, &s.InternalServiceError{Message: "service unavailable"}) + }, + expectedError: &s.InternalServiceError{Message: "service unavailable"}, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "service busy error during poll", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, &s.ServiceBusyError{Message: "service busy"}) + }, + expectedError: &s.ServiceBusyError{Message: "service busy"}, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "context timeout during poll", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, context.DeadlineExceeded) + }, + expectedError: context.DeadlineExceeded, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + { + name: "domain not exists error during poll", + setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) { + wP, mockService, _, _ := buildWorkflowTaskPoller(t) + return wP, mockService + }, + setupMocks: func(mockService *workflowservicetest.MockClient) { + mockService.EXPECT().PollForDecisionTask( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, &s.EntityNotExistsError{Message: "domain does not exist"}) + }, + expectedError: &s.EntityNotExistsError{Message: "domain does not exist"}, + validateResult: func(t *testing.T, result interface{}) { + assert.Nil(t, result) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + poller, mockService := tt.setupPoller(t) + tt.setupMocks(mockService) + + result, err := poller.PollTask() + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError, err) + } else { + assert.NoError(t, err) + } + + tt.validateResult(t, result) + }) + } +} + func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *mockLocalDispatcher) { ctrl := gomock.NewController(t) mockService := workflowservicetest.NewMockClient(ctrl) @@ -207,7 +553,7 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic ldaTunnel: lda, metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, logger: testlogger.NewZap(t), - stickyUUID: "", + stickyUUID: "sticky-uuid", disableStickyExecution: false, StickyScheduleToStartTimeout: time.Millisecond, featureFlags: FeatureFlags{},