Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
task := &workflowTask{
task: response,
historyIterator: historyIterator,
autoConfigHint: response.GetAutoConfigHint(),
}
return task
}
Expand Down Expand Up @@ -1096,7 +1097,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
}
if response == nil || len(response.TaskToken) == 0 {
atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1)
return nil, startTime, nil
return response, startTime, nil
}

return response, startTime, err
Expand Down
348 changes: 347 additions & 1 deletion internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,352 @@ func TestProcessTask_failures(t *testing.T) {
})
}

func TestActivityTaskPoller_PollTask(t *testing.T) {
tests := []struct {
name string
setupPoller func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient)
setupMocks func(*workflowservicetest.MockClient)
expectedResult interface{}
expectedError error
validateResult func(t *testing.T, result interface{})
}{
{
name: "success with valid activity task",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
&s.PollForActivityTaskRequest{
Domain: common.StringPtr(_testDomainName),
TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(_testTaskList)}),
Identity: common.StringPtr(_testIdentity),
TaskListMetadata: &s.TaskListMetadata{MaxTasksPerSecond: common.Float64Ptr(0.0)},
},
gomock.Any(),
).Return(&s.PollForActivityTaskResponse{
TaskToken: []byte("test-task-token"),
WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow")},
ActivityId: common.StringPtr("test-activity"),
ActivityType: &s.ActivityType{Name: common.StringPtr("TestActivity")},
WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")},
ScheduledTimestampOfThisAttempt: common.Int64Ptr(time.Now().UnixNano()),
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
activityTask, ok := result.(*activityTask)
assert.True(t, ok, "result should be *activityTask")
assert.NotNil(t, activityTask.task)
assert.Equal(t, []byte("test-task-token"), activityTask.task.TaskToken)
assert.NotNil(t, activityTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint)
},
},
{
name: "success with empty task (no work available)",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForActivityTaskResponse{
TaskToken: []byte{}, // Empty task token indicates no work
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
activityTask, ok := result.(*activityTask)
assert.True(t, ok, "result should be *activityTask")
assert.Nil(t, activityTask.task)
assert.NotNil(t, activityTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, activityTask.autoConfigHint)
},
},
{
name: "service error during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.InternalServiceError{Message: "service unavailable"})
},
expectedError: &s.InternalServiceError{Message: "service unavailable"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "service busy error during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.ServiceBusyError{Message: "service busy"})
},
expectedError: &s.ServiceBusyError{Message: "service busy"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "poller shutting down",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, true) // shutdown = true
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
// No mock setup needed as doPoll should return early due to shutdown
},
expectedError: errShutdown,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "context timeout during poll",
setupPoller: func(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
return buildActivityTaskPoller(t, false)
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForActivityTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, context.DeadlineExceeded)
},
expectedError: context.DeadlineExceeded,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
poller, mockService := tt.setupPoller(t)
tt.setupMocks(mockService)

result, err := poller.PollTask()

if tt.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedError, err)
} else {
assert.NoError(t, err)
}

tt.validateResult(t, result)
})
}
}

func buildActivityTaskPoller(t *testing.T, shutdown bool) (*activityTaskPoller, *workflowservicetest.MockClient) {
ctrl := gomock.NewController(t)
mockService := workflowservicetest.NewMockClient(ctrl)

var shutdownC <-chan struct{}
if shutdown {
ch := make(chan struct{})
close(ch)
shutdownC = ch
} else {
shutdownC = make(<-chan struct{})
}

return &activityTaskPoller{
basePoller: basePoller{
shutdownC: shutdownC,
},
domain: _testDomainName,
taskListName: _testTaskList,
identity: _testIdentity,
service: mockService,
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
logger: testlogger.NewZap(t),
activitiesPerSecond: 0.0,
featureFlags: FeatureFlags{},
}, mockService
}

func TestWorkflowTaskPoller_PollTask(t *testing.T) {
tests := []struct {
name string
setupPoller func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient)
setupMocks func(*workflowservicetest.MockClient)
expectedResult interface{}
expectedError error
validateResult func(t *testing.T, result interface{})
}{
{
name: "success with valid workflow task",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForDecisionTaskResponse{
TaskToken: []byte("test-task-token"),
WorkflowExecution: &s.WorkflowExecution{WorkflowId: common.StringPtr("test-workflow"), RunId: common.StringPtr("test-run")},
WorkflowType: &s.WorkflowType{Name: common.StringPtr("TestWorkflow")},
StartedEventId: common.Int64Ptr(1),
NextEventId: common.Int64Ptr(2),
ScheduledTimestamp: common.Int64Ptr(time.Now().UnixNano()),
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
History: &s.History{Events: []*s.HistoryEvent{{EventId: common.Int64Ptr(1)}}},
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
workflowTask, ok := result.(*workflowTask)
assert.True(t, ok, "result should be *workflowTask")
assert.NotNil(t, workflowTask.task)
assert.Equal(t, []byte("test-task-token"), workflowTask.task.TaskToken)
assert.NotNil(t, workflowTask.historyIterator)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint)
},
},
{
name: "success with empty task (no work available)",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(&s.PollForDecisionTaskResponse{
TaskToken: []byte{}, // Empty task token indicates no work
AutoConfigHint: &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)},
}, nil)
},
expectedError: nil,
validateResult: func(t *testing.T, result interface{}) {
assert.NotNil(t, result)
workflowTask, ok := result.(*workflowTask)
assert.True(t, ok, "result should be *workflowTask")
assert.Nil(t, workflowTask.task)
assert.NotNil(t, workflowTask.autoConfigHint)
assert.Equal(t, &s.AutoConfigHint{PollerWaitTimeInMs: common.Int64Ptr(1000)}, workflowTask.autoConfigHint)
},
},
{
name: "service error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.InternalServiceError{Message: "service unavailable"})
},
expectedError: &s.InternalServiceError{Message: "service unavailable"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "service busy error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.ServiceBusyError{Message: "service busy"})
},
expectedError: &s.ServiceBusyError{Message: "service busy"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "context timeout during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, context.DeadlineExceeded)
},
expectedError: context.DeadlineExceeded,
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
{
name: "domain not exists error during poll",
setupPoller: func(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient) {
wP, mockService, _, _ := buildWorkflowTaskPoller(t)
return wP, mockService
},
setupMocks: func(mockService *workflowservicetest.MockClient) {
mockService.EXPECT().PollForDecisionTask(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, &s.EntityNotExistsError{Message: "domain does not exist"})
},
expectedError: &s.EntityNotExistsError{Message: "domain does not exist"},
validateResult: func(t *testing.T, result interface{}) {
assert.Nil(t, result)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
poller, mockService := tt.setupPoller(t)
tt.setupMocks(mockService)

result, err := poller.PollTask()

if tt.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedError, err)
} else {
assert.NoError(t, err)
}

tt.validateResult(t, result)
})
}
}

func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *mockLocalDispatcher) {
ctrl := gomock.NewController(t)
mockService := workflowservicetest.NewMockClient(ctrl)
Expand All @@ -207,7 +553,7 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic
ldaTunnel: lda,
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
logger: testlogger.NewZap(t),
stickyUUID: "",
stickyUUID: "sticky-uuid",
disableStickyExecution: false,
StickyScheduleToStartTimeout: time.Millisecond,
featureFlags: FeatureFlags{},
Expand Down
Loading