Skip to content

Commit aa89bb7

Browse files
authored
[poller autoscaler] fix logic to identify empty tasks (#1192)
1 parent db5eb67 commit aa89bb7

File tree

4 files changed

+28
-10
lines changed

4 files changed

+28
-10
lines changed

internal/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
524524
if options != nil {
525525
return FeatureFlags{
526526
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
527+
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
527528
}
528529
}
529530
return FeatureFlags{}

internal/internal_poller_autoscaler.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,17 +171,31 @@ func (m *pollerUsageEstimator) Reset() {
171171

172172
// CollectUsage counts past poll results to estimate autoscaler.Usages
173173
func (m *pollerUsageEstimator) CollectUsage(data interface{}) error {
174-
switch v := data.(type) {
175-
case *polledTask:
176-
if v == nil { // no-task poll
177-
m.atomicBits.Add(1 << 32)
178-
} else {
179-
m.atomicBits.Add(1)
180-
}
174+
isEmpty, err := isTaskEmpty(data)
175+
if err != nil {
176+
return err
177+
}
178+
if isEmpty { // no-task poll
179+
m.atomicBits.Add(1 << 32)
180+
} else {
181+
m.atomicBits.Add(1)
181182
}
182183
return nil
183184
}
184185

186+
func isTaskEmpty(task interface{}) (bool, error) {
187+
switch t := task.(type) {
188+
case *workflowTask:
189+
return t == nil || t.task == nil, nil
190+
case *activityTask:
191+
return t == nil || t.task == nil, nil
192+
case *localActivityTask:
193+
return t == nil || t.workflowTask == nil, nil
194+
default:
195+
return false, errors.New("unknown task type")
196+
}
197+
}
198+
185199
// Estimate is based on past poll counts
186200
func (m *pollerUsageEstimator) Estimate() (autoscaler.Usages, error) {
187201
bits := m.atomicBits.Load()

internal/internal_poller_autoscaler_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package internal
2323
import (
2424
"github.com/stretchr/testify/assert"
2525
"go.uber.org/atomic"
26+
s "go.uber.org/cadence/.gen/go/shared"
2627
"go.uber.org/cadence/internal/common/autoscaler"
2728
"go.uber.org/zap/zaptest"
2829
"math/rand"
@@ -270,10 +271,10 @@ type unrelatedPolledTask struct{}
270271
func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} {
271272
var result []interface{}
272273
for i := 0; i < noTaskPoll; i++ {
273-
result = append(result, (*polledTask)(nil))
274+
result = append(result, &activityTask{})
274275
}
275276
for i := 0; i < taskPoll; i++ {
276-
result = append(result, &polledTask{})
277+
result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}})
277278
}
278279
for i := 0; i < unrelated; i++ {
279280
result = append(result, &unrelatedPolledTask{})

internal/internal_worker_base.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,9 @@ func (bw *baseWorker) pollTask() {
306306
} else {
307307
if bw.pollerAutoScaler != nil {
308308
if pErr := bw.pollerAutoScaler.CollectUsage(task); pErr != nil {
309-
bw.logger.Warn("poller auto scaler collect usage error", zap.Error(pErr))
309+
bw.logger.Sugar().Warnw("poller auto scaler collect usage error",
310+
"error", pErr,
311+
"task", task)
310312
}
311313
}
312314
bw.retrier.Succeeded()

0 commit comments

Comments
 (0)