Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
task := &workflowTask{
task: response,
historyIterator: historyIterator,
autoConfigHint: response.GetAutoConfigHint(),
}
return task
}
Expand Down Expand Up @@ -1096,7 +1097,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
}
if response == nil || len(response.TaskToken) == 0 {
atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1)
return nil, startTime, nil
return response, startTime, nil
}

return response, startTime, err
Expand Down Expand Up @@ -1129,7 +1130,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
scheduledToStartLatency := time.Duration(response.GetStartedTimestamp() - response.GetScheduledTimestampOfThisAttempt())
metricsScope.Timer(metrics.ActivityScheduledToStartLatency).Record(scheduledToStartLatency)

return &activityTask{task: response, pollStartTime: startTime}, nil
return &activityTask{task: response, pollStartTime: startTime, autoConfigHint: response.GetAutoConfigHint()}, nil
}

// PollTask polls a new task
Expand Down
348 changes: 347 additions & 1 deletion internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,352 @@ func TestProcessTask_failures(t *testing.T) {
})
}

func TestActivityTaskPoller_PollTask(t *testing.T) {
tests := []struct {
name string
setupPoller func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient)
setupMocks func(*workflowservicetest.MockClient)
expectedResult interface{}
expectedError error
validateResult func(t *testing.T, result interface{})
}{
{
name: "success with valid activity task",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
&s.PollForActivityTaskRequest{
Domain: common.StringPtr(_testDomainName),
TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(_testTaskList)}),
Identity: common.StringPtr(_testIdentity),
TaskListMetadata: &s.TaskListMetadata{MaxTasksPerSecond: common.Float64Ptr(0.0)},
},
gomock.Any(),
).Return(&s.PollForActivityTaskResponse{
TaskToken: []byte("test-task-token"),
WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow")},
ActivityId: common.StringPtr("test-activity"),
ActivityType: &s.ActivityType{Name: common.StringPtr("TestActivity")},
WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")},
ScheduledTimestampOfThisAttempt: common.Int64Ptr(time.Now().UnixNano()),
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
activityTask, ok := result.(*activityTask)
assert.True(t, ok, "result should be *activityTask")
assert.NotNil(t, activityTask.task)
assert.Equal(t, []byte("test-task-token"), activityTask.task.TaskToken)
assert.NotNil(t, activityTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint)
},
},
{
name: "success with empty task (no work available)",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForActivityTaskResponse{
TaskToken: []byte{}, // Empty task token indicates no work
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
activityTask, ok := result.(*activityTask)
assert.True(t, ok, "result should be *activityTask")
assert.Nil(t, activityTask.task)
assert.NotNil(t, activityTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint)
},
},
{
name: "service error during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.InternalServiceError{Message: "service unavailable"})
},
expectedError: &s.InternalServiceError{Message: "service unavailable"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "service busy error during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.ServiceBusyError{Message: "service busy"})
},
expectedError: &s.ServiceBusyError{Message: "service busy"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "poller shutting down",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, true) // shutdown = true
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
// No mock setup needed as doPoll should return early due to shutdown
},
expectedError: errShutdown,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "context timeout during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, context.DeadlineExceeded)
},
expectedError: context.DeadlineExceeded,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
poller, mockService := tt.setupPoller(t)
tt.setupMocks(mockService)

result, err := poller.PollTask()

if tt.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedError, err)
} else {
assert.NoError(t, err)
}

tt.validateResult(t, result)
})
}
}

func buildActivityTaskPoller(t *testing.T, shutdown bool) (*activityTaskPoller, *workflowservicetest.MockClient) {
ctrl := gomock.NewController(t)
mockService := workflowservicetest.NewMockClient(ctrl)

var shutdownC <-chan struct{}
if shutdown {
ch := make(chan struct{})
close(ch)
shutdownC = ch
} else {
shutdownC = make(<-chan struct{})
}

return &activityTaskPoller{
basePoller: basePoller{
shutdownC: shutdownC,
},
domain: _testDomainName,
taskListName: _testTaskList,
identity: _testIdentity,
service: mockService,
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
logger: testlogger.NewZap(t),
activitiesPerSecond: 0.0,
featureFlags: FeatureFlags{},
}, mockService
}

func TestWorkflowTaskPoller_PollTask(t *testing.T) {
tests := []struct {
name string
setupPoller func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient)
setupMocks func(*workflowservicetest.MockClient)
expectedResult interface{}
expectedError error
validateResult func(t *testing.T, result interface{})
}{
{
name: "success with valid workflow task",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForDecisionTaskResponse{
TaskToken: []byte("test-task-token"),
WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow"), RunId: common.StringPtr("test-run")},
WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")},
StartedEventId: common.Int64Ptr(1),
NextEventId: common.Int64Ptr(2),
ScheduledTimestamp: common.Int64Ptr(time.Now().UnixNano()),
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
History: &s.History{Events: []*s.HistoryEvent{{EventId: common.Int64Ptr(1)}}},
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
workflowTask, ok := result.(*workflowTask)
assert.True(t, ok, "result should be *workflowTask")
assert.NotNil(t, workflowTask.task)
assert.Equal(t, []byte("test-task-token"), workflowTask.task.TaskToken)
assert.NotNil(t, workflowTask.historyIterator)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint)
},
},
{
name: "success with empty task (no work available)",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForDecisionTaskResponse{
TaskToken: []byte{}, // Empty task token indicates no work
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
workflowTask, ok := result.(*workflowTask)
assert.True(t, ok, "result should be *workflowTask")
assert.Nil(t, workflowTask.task)
assert.NotNil(t, workflowTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint)
},
},
{
name: "service error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.InternalServiceError{Message: "service unavailable"})
},
expectedError: &s.InternalServiceError{Message: "service unavailable"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "service busy error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.ServiceBusyError{Message: "service busy"})
},
expectedError: &s.ServiceBusyError{Message: "service busy"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "context timeout during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, context.DeadlineExceeded)
},
expectedError: context.DeadlineExceeded,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "domain not exists error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.EntityNotExistsError{Message: "domain does not exist"})
},
expectedError: &s.EntityNotExistsError{Message: "domain does not exist"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
poller, mockService := tt.setupPoller(t)
tt.setupMocks(mockService)

result, err := poller.PollTask()

if tt.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedError, err)
} else {
assert.NoError(t, err)
}

tt.validateResult(t, result)
})
}
}

func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *mockLocalDispatcher) {
ctrl := gomock.NewController(t)
mockService := workflowservicetest.NewMockClient(ctrl)
Expand All @@ -207,7 +553,7 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic
ldaTunnel: lda,
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
logger: testlogger.NewZap(t),
stickyUUID: "",
stickyUUID: "sticky-uuid",
disableStickyExecution: false,
StickyScheduleToStartTimeout: time.Millisecond,
featureFlags: FeatureFlags{},
Expand Down