Skip to content

Commit 9783bf3

Browse files
committed
read autoconfighint from empty poll
1 parent 6c105e0 commit 9783bf3

File tree

3 files changed

+134
-6
lines changed

3 files changed

+134
-6
lines changed

internal/internal_task_handlers.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,17 @@ type (
7575
// workflowTask wraps a decision task.
7676
workflowTask struct {
7777
task *s.PollForDecisionTaskResponse
78+
autoConfigHint *s.AutoConfigHint
7879
historyIterator HistoryIterator
7980
doneCh chan struct{}
8081
laResultCh chan *localActivityResult
8182
}
8283

8384
// activityTask wraps a activity task.
8485
activityTask struct {
85-
task *s.PollForActivityTaskResponse
86-
pollStartTime time.Time
86+
task *s.PollForActivityTaskResponse
87+
autoConfigHint *s.AutoConfigHint
88+
pollStartTime time.Time
8789
}
8890

8991
// resetStickinessTask wraps a ResetStickyTaskListRequest.

internal/internal_task_pollers.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,9 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
848848
if response == nil || len(response.TaskToken) == 0 {
849849
wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1)
850850
wtp.updateBacklog(request.TaskList.GetKind(), 0)
851-
return &workflowTask{}, nil
851+
return &workflowTask{
852+
autoConfigHint: response.GetAutoConfigHint(),
853+
}, nil
852854
}
853855

854856
wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint())
@@ -908,6 +910,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
908910
task := &workflowTask{
909911
task: response,
910912
historyIterator: historyIterator,
913+
autoConfigHint: response.GetAutoConfigHint(),
911914
}
912915
return task
913916
}
@@ -1095,7 +1098,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
10951098
}
10961099
if response == nil || len(response.TaskToken) == 0 {
10971100
atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1)
1098-
return nil, startTime, nil
1101+
return response, startTime, nil
10991102
}
11001103

11011104
return response, startTime, err
@@ -1116,7 +1119,9 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
11161119
return nil, err
11171120
}
11181121
if response == nil || len(response.TaskToken) == 0 {
1119-
return &activityTask{}, nil
1122+
return &activityTask{
1123+
autoConfigHint: response.GetAutoConfigHint(),
1124+
}, nil
11201125
}
11211126

11221127
workflowType := response.WorkflowType.GetName()
@@ -1128,7 +1133,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
11281133
scheduledToStartLatency := time.Duration(response.GetStartedTimestamp() - response.GetScheduledTimestampOfThisAttempt())
11291134
metricsScope.Timer(metrics.ActivityScheduledToStartLatency).Record(scheduledToStartLatency)
11301135

1131-
return &activityTask{task: response, pollStartTime: startTime}, nil
1136+
return &activityTask{task: response, pollStartTime: startTime, autoConfigHint: response.GetAutoConfigHint()}, nil
11321137
}
11331138

11341139
// PollTask polls a new task

internal/internal_task_pollers_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,110 @@ func Test_newWorkflowTaskPoller(t *testing.T) {
6161
})
6262
}
6363

64+
func TestWorkflowTaskPoller(t *testing.T) {
65+
t.Run("PollTask", func(t *testing.T) {
66+
task := &s.PollForDecisionTaskResponse{
67+
TaskToken: []byte("some value"),
68+
AutoConfigHint: &s.AutoConfigHint{
69+
common.PtrOf(true),
70+
common.PtrOf(int64(1000)),
71+
},
72+
}
73+
emptyTask := &s.PollForDecisionTaskResponse{
74+
TaskToken: nil,
75+
AutoConfigHint: &s.AutoConfigHint{
76+
common.PtrOf(true),
77+
common.PtrOf(int64(1000)),
78+
},
79+
}
80+
for _, tt := range []struct {
81+
name string
82+
response *s.PollForDecisionTaskResponse
83+
expected *workflowTask
84+
}{
85+
{
86+
"success with task",
87+
task,
88+
&workflowTask{
89+
task: task,
90+
autoConfigHint: task.AutoConfigHint,
91+
},
92+
},
93+
{
94+
"success with empty task",
95+
emptyTask,
96+
&workflowTask{
97+
task: nil,
98+
autoConfigHint: task.AutoConfigHint,
99+
},
100+
},
101+
} {
102+
t.Run(tt.name, func(t *testing.T) {
103+
poller, client, _, _ := buildWorkflowTaskPoller(t)
104+
client.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil)
105+
result, err := poller.PollTask()
106+
assert.NoError(t, err)
107+
resultTask, ok := result.(*workflowTask)
108+
assert.True(t, ok)
109+
assert.Equal(t, tt.expected.task, resultTask.task)
110+
assert.Equal(t, tt.expected.autoConfigHint, resultTask.autoConfigHint)
111+
})
112+
}
113+
})
114+
}
115+
116+
func TestActivityTaskPoller(t *testing.T) {
117+
t.Run("PollTask", func(t *testing.T) {
118+
task := &s.PollForActivityTaskResponse{
119+
TaskToken: []byte("some value"),
120+
AutoConfigHint: &s.AutoConfigHint{
121+
common.PtrOf(true),
122+
common.PtrOf(int64(1000)),
123+
},
124+
}
125+
emptyTask := &s.PollForActivityTaskResponse{
126+
TaskToken: nil,
127+
AutoConfigHint: &s.AutoConfigHint{
128+
common.PtrOf(true),
129+
common.PtrOf(int64(1000)),
130+
},
131+
}
132+
for _, tt := range []struct {
133+
name string
134+
response *s.PollForActivityTaskResponse
135+
expected *activityTask
136+
}{
137+
{
138+
"success with task",
139+
task,
140+
&activityTask{
141+
task: task,
142+
autoConfigHint: task.AutoConfigHint,
143+
},
144+
},
145+
{
146+
"success with empty task",
147+
emptyTask,
148+
&activityTask{
149+
task: nil,
150+
autoConfigHint: task.AutoConfigHint,
151+
},
152+
},
153+
} {
154+
t.Run(tt.name, func(t *testing.T) {
155+
poller, client := buildActivityTaskPoller(t)
156+
client.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil)
157+
result, err := poller.PollTask()
158+
assert.NoError(t, err)
159+
resultTask, ok := result.(*activityTask)
160+
assert.True(t, ok)
161+
assert.Equal(t, tt.expected.task, resultTask.task)
162+
assert.Equal(t, tt.expected.autoConfigHint, resultTask.autoConfigHint)
163+
})
164+
}
165+
})
166+
}
167+
64168
func TestLocalActivityPanic(t *testing.T) {
65169
// regression: panics in local activities should not terminate the process
66170
s := WorkflowTestSuite{logger: testlogger.NewZap(t)}
@@ -213,3 +317,20 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic
213317
featureFlags: FeatureFlags{},
214318
}, mockService, taskHandler, lda
215319
}
320+
321+
func buildActivityTaskPoller(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) {
322+
ctrl := gomock.NewController(t)
323+
mockService := workflowservicetest.NewMockClient(ctrl)
324+
return &activityTaskPoller{
325+
basePoller: basePoller{
326+
shutdownC: make(<-chan struct{}),
327+
},
328+
domain: _testDomainName,
329+
taskListName: _testTaskList,
330+
identity: _testIdentity,
331+
service: mockService,
332+
metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)},
333+
logger: testlogger.NewZap(t),
334+
featureFlags: FeatureFlags{},
335+
}, mockService
336+
}

0 commit comments

Comments
 (0)