Skip to content

Commit d9dcddc

Browse files
authored
Populate tasks in internal workflowTask and activityTask entities for empty polls (#1416)
What changed? ensure task field is populated in workflowTask and activityTask for empty polls. Why? Task response even with empty task contains information like AutoConfigHint, which is needed for poller auto scaler How did you test it? Unit Test
1 parent 6c105e0 commit d9dcddc

File tree

4 files changed

+122
-7
lines changed

4 files changed

+122
-7
lines changed

internal/internal_poller_autoscaler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ func (m *pollerUsageEstimator) CollectUsage(data interface{}) error {
174174
func isTaskEmpty(task interface{}) (bool, error) {
175175
switch t := task.(type) {
176176
case *workflowTask:
177-
return t == nil || t.task == nil, nil
177+
return t == nil || t.task == nil || len(t.task.TaskToken) == 0, nil
178178
case *activityTask:
179-
return t == nil || t.task == nil, nil
179+
return t == nil || t.task == nil || len(t.task.TaskToken) == 0, nil
180180
case *localActivityTask:
181181
return t == nil || t.workflowTask == nil, nil
182182
default:

internal/internal_poller_autoscaler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,10 @@ type unrelatedPolledTask struct{}
278278
func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} {
279279
var result []interface{}
280280
for i := 0; i < noTaskPoll; i++ {
281-
result = append(result, &activityTask{})
281+
result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}})
282282
}
283283
for i := 0; i < taskPoll; i++ {
284-
result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}})
284+
result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{TaskToken: []byte("some value")}})
285285
}
286286
for i := 0; i < unrelated; i++ {
287287
result = append(result, &unrelatedPolledTask{})

internal/internal_task_pollers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
848848
if response == nil || len(response.TaskToken) == 0 {
849849
wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1)
850850
wtp.updateBacklog(request.TaskList.GetKind(), 0)
851-
return &workflowTask{}, nil
851+
return &workflowTask{task: response}, nil
852852
}
853853

854854
wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint())
@@ -1095,7 +1095,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
10951095
}
10961096
if response == nil || len(response.TaskToken) == 0 {
10971097
atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1)
1098-
return nil, startTime, nil
1098+
return response, startTime, nil
10991099
}
11001100

11011101
return response, startTime, err
@@ -1116,7 +1116,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
11161116
return nil, err
11171117
}
11181118
if response == nil || len(response.TaskToken) == 0 {
1119-
return &activityTask{}, nil
1119+
return &activityTask{task: response}, nil
11201120
}
11211121

11221122
workflowType := response.WorkflowType.GetName()

internal/internal_task_pollers_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,104 @@ func Test_newWorkflowTaskPoller(t *testing.T) {
6161
})
6262
}
6363

64+
func TestWorkflowTaskPoller(t *testing.T) {
65+
t.Run("PollTask", func(t *testing.T) {
66+
task := &s.PollForDecisionTaskResponse{
67+
TaskToken: []byte("some value"),
68+
AutoConfigHint: &s.AutoConfigHint{
69+
common.PtrOf(true),
70+
common.PtrOf(int64(1000)),
71+
},
72+
}
73+
emptyTask := &s.PollForDecisionTaskResponse{
74+
TaskToken: nil,
75+
AutoConfigHint: &s.AutoConfigHint{
76+
common.PtrOf(true),
77+
common.PtrOf(int64(1000)),
78+
},
79+
}
80+
for _, tt := range []struct {
81+
name string
82+
response *s.PollForDecisionTaskResponse
83+
expected *workflowTask
84+
}{
85+
{
86+
"success with task",
87+
task,
88+
&workflowTask{
89+
task: task,
90+
},
91+
},
92+
{
93+
"success with empty task",
94+
emptyTask,
95+
&workflowTask{
96+
task: emptyTask,
97+
},
98+
},
99+
} {
100+
t.Run(tt.name, func(t *testing.T) {
101+
poller, client, _, _ := buildWorkflowTaskPoller(t)
102+
client.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil)
103+
result, err := poller.PollTask()
104+
assert.NoError(t, err)
105+
resultTask, ok := result.(*workflowTask)
106+
assert.True(t, ok)
107+
assert.Equal(t, tt.expected.task, resultTask.task)
108+
})
109+
}
110+
})
111+
}
112+
113+
func TestActivityTaskPoller(t *testing.T) {
114+
t.Run("PollTask", func(t *testing.T) {
115+
task := &s.PollForActivityTaskResponse{
116+
TaskToken: []byte("some value"),
117+
AutoConfigHint: &s.AutoConfigHint{
118+
common.PtrOf(true),
119+
common.PtrOf(int64(1000)),
120+
},
121+
}
122+
emptyTask := &s.PollForActivityTaskResponse{
123+
TaskToken: nil,
124+
AutoConfigHint: &s.AutoConfigHint{
125+
common.PtrOf(true),
126+
common.PtrOf(int64(1000)),
127+
},
128+
}
129+
for _, tt := range []struct {
130+
name string
131+
response *s.PollForActivityTaskResponse
132+
expected *activityTask
133+
}{
134+
{
135+
"success with task",
136+
task,
137+
&activityTask{
138+
task: task,
139+
},
140+
},
141+
{
142+
"success with empty task",
143+
emptyTask,
144+
&activityTask{
145+
task: emptyTask,
146+
},
147+
},
148+
} {
149+
t.Run(tt.name, func(t *testing.T) {
150+
poller, client := buildActivityTaskPoller(t)
151+
client.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil)
152+
result, err := poller.PollTask()
153+
assert.NoError(t, err)
154+
resultTask, ok := result.(*activityTask)
155+
assert.True(t, ok)
156+
assert.Equal(t, tt.expected.task, resultTask.task)
157+
})
158+
}
159+
})
160+
}
161+
64162
func TestLocalActivityPanic(t *testing.T) {
65163
// regression: panics in local activities should not terminate the process
66164
s := WorkflowTestSuite{logger: testlogger.NewZap(t)}
@@ -213,3 +311,20 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic
213311
featureFlags: FeatureFlags{},
214312
}, mockService, taskHandler, lda
215313
}
314+
315+
func buildActivityTaskPoller(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
316+
ctrl := gomock.NewController(t)
317+
mockService := workflowservicetest.NewMockClient(ctrl)
318+
return &activityTaskPoller{
319+
basePoller: basePoller{
320+
shutdownC: make(<-chan struct{}),
321+
},
322+
domain: _testDomainName,
323+
taskListName: _testTaskList,
324+
identity: _testIdentity,
325+
service: mockService,
326+
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
327+
logger: testlogger.NewZap(t),
328+
featureFlags: FeatureFlags{},
329+
}, mockService
330+
}

0 commit comments

Comments
 (0)