Skip to content

Commit b467b01

Browse files
committed
address comments except for exposing poller wait time and metrics fixing
1 parent 6b028ad commit b467b01

File tree

5 files changed

+115
-60
lines changed

5 files changed

+115
-60
lines changed

internal/internal_task_handlers.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,23 @@ type (
166166
)
167167

168168
func (t *workflowTask) getAutoConfigHint() *s.AutoConfigHint {
169+
if t.autoConfigHint != nil {
170+
return t.autoConfigHint
171+
}
169172
if t.task != nil {
170173
return t.task.AutoConfigHint
171174
}
172-
return t.autoConfigHint
175+
return nil
173176
}
174177

175178
func (t *activityTask) getAutoConfigHint() *s.AutoConfigHint {
179+
if t.autoConfigHint != nil {
180+
return t.autoConfigHint
181+
}
176182
if t.task != nil {
177183
return t.task.AutoConfigHint
178184
}
179-
return t.autoConfigHint
185+
return nil
180186
}
181187

182188
func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandlerImpl) *history {

internal/internal_worker_base.go

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,7 @@ func (bw *baseWorker) Start() {
217217

218218
bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1)
219219

220-
if bw.concurrencyAutoScaler != nil {
221-
bw.concurrencyAutoScaler.Start()
222-
}
220+
bw.concurrencyAutoScaler.Start()
223221

224222
for i := 0; i < bw.options.pollerCount; i++ {
225223
bw.shutdownWG.Add(1)
@@ -308,12 +306,10 @@ func (bw *baseWorker) pollTask() {
308306
var err error
309307
var task interface{}
310308

311-
if bw.concurrencyAutoScaler != nil {
312-
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
313-
defer bw.concurrency.PollerPermit.Release()
314-
} else {
315-
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
316-
}
309+
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
310+
defer bw.concurrency.PollerPermit.Release()
311+
} else {
312+
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
317313
}
318314

319315
bw.retrier.Throttle()
@@ -332,7 +328,7 @@ func (bw *baseWorker) pollTask() {
332328
bw.retrier.Failed()
333329
} else {
334330
bw.retrier.Succeeded()
335-
if t, ok := task.(autoConfigHintAwareTask); bw.concurrencyAutoScaler != nil && ok {
331+
if t, ok := task.(autoConfigHintAwareTask); ok {
336332
bw.concurrencyAutoScaler.ProcessPollerHint(t.getAutoConfigHint())
337333
}
338334
}
@@ -408,9 +404,7 @@ func (bw *baseWorker) Stop() {
408404
}
409405
close(bw.shutdownCh)
410406
bw.limiterContextCancel()
411-
if bw.concurrencyAutoScaler != nil {
412-
bw.concurrencyAutoScaler.Stop()
413-
}
407+
bw.concurrencyAutoScaler.Stop()
414408

415409
if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
416410
traceLog(func() {
@@ -424,20 +418,3 @@ func (bw *baseWorker) Stop() {
424418
}
425419
return
426420
}
427-
428-
func getAutoConfigHint(task interface{}) *shared.AutoConfigHint {
429-
switch t := task.(type) {
430-
case *workflowTask:
431-
if t.task != nil {
432-
return t.task.AutoConfigHint
433-
}
434-
return t.autoConfigHint
435-
case *activityTask:
436-
if t.task != nil {
437-
return t.task.AutoConfigHint
438-
}
439-
return t.autoConfigHint
440-
default:
441-
return nil
442-
}
443-
}

internal/internal_worker_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,11 @@ func TestGetTaskAutoConfigHint(t *testing.T) {
15421542
},
15431543
} {
15441544
t.Run(tt.name, func(t *testing.T) {
1545-
assert.Equal(t, tt.want, getAutoConfigHint(tt.task))
1545+
if task, ok := tt.task.(autoConfigHintAwareTask); ok {
1546+
assert.Equal(t, tt.want, task.getAutoConfigHint())
1547+
} else {
1548+
assert.Nil(t, tt.want)
1549+
}
15461550
})
15471551
}
15481552
}

internal/worker/concurrency_auto_scaler.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,16 @@ const (
3939
upperPollerWaitTime = 256 * time.Millisecond
4040
numberOfPollsInRollingAverage = 20
4141

42-
autoScalerEventPollerUpdate autoScalerEvent = "update-poller-limit"
43-
autoScalerEventPollerSkipUpdateCooldown = "skip-update-poller-limit-cooldown"
44-
autoScalerEventPollerSkipUpdateNoChange = "skip-update-poller-limit-no-change"
45-
autoScalerEventPollerSkipUpdateNotEnabled = "skip-update-poller-limit-not-enabled"
46-
autoScalerEventMetrics = "metrics"
47-
autoScalerEventEnable = "enable"
48-
autoScalerEventDisable = "disable"
49-
autoScalerEventStart = "start"
50-
autoScalerEventStop = "stop"
42+
autoScalerEventPollerScaleUp autoScalerEvent = "poller-limit-scale-up"
43+
autoScalerEventPollerScaleDown autoScalerEvent = "poller-limit-scale-down"
44+
autoScalerEventPollerSkipUpdateCooldown autoScalerEvent = "poller-limit-skip-update-cooldown"
45+
autoScalerEventPollerSkipUpdateNoChange autoScalerEvent = "poller-limit-skip-update-no-change"
46+
autoScalerEventPollerSkipUpdateNotEnabled autoScalerEvent = "poller-limit-skip-update-not-enabled"
47+
autoScalerEventEmitMetrics autoScalerEvent = "auto-scaler-emit-metrics"
48+
autoScalerEventEnable autoScalerEvent = "auto-scaler-enable"
49+
autoScalerEventDisable autoScalerEvent = "auto-scaler-disable"
50+
autoScalerEventStart autoScalerEvent = "auto-scaler-start"
51+
autoScalerEventStop autoScalerEvent = "auto-scaler-stop"
5152
autoScalerEventLogMsg string = "concurrency auto scaler event"
5253
testTimeFormat string = "15:04:05"
5354

@@ -118,11 +119,19 @@ func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAuto
118119
}
119120

120121
func (c *ConcurrencyAutoScaler) Start() {
122+
if c == nil {
123+
return // no-op if auto scaler is not set
124+
}
121125
c.logEvent(autoScalerEventStart)
122126

123127
c.wg.Add(1)
124128

125129
go func() {
130+
defer func() {
131+
if r := recover(); r != nil {
132+
c.log.Error("panic in concurrency auto scaler, stopping the auto scaler", zap.Any("error", r))
133+
}
134+
}()
126135
defer c.wg.Done()
127136
ticker := c.clock.NewTicker(c.updateTick)
128137
defer ticker.Stop()
@@ -132,7 +141,7 @@ func (c *ConcurrencyAutoScaler) Start() {
132141
return
133142
case <-ticker.Chan():
134143
c.lock.Lock()
135-
c.logEvent(autoScalerEventMetrics)
144+
c.logEvent(autoScalerEventEmitMetrics)
136145
c.updatePollerPermit()
137146
c.lock.Unlock()
138147
}
@@ -141,6 +150,9 @@ func (c *ConcurrencyAutoScaler) Start() {
141150
}
142151

143152
func (c *ConcurrencyAutoScaler) Stop() {
153+
if c == nil {
154+
return // no-op if auto scaler is not set
155+
}
144156
close(c.shutdownChan)
145157
c.wg.Wait()
146158
c.logEvent(autoScalerEventStop)
@@ -150,6 +162,9 @@ func (c *ConcurrencyAutoScaler) Stop() {
150162
// 1. update poller wait time
151163
// 2. enable/disable auto scaler
152164
func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) {
165+
if c == nil {
166+
return // no-op if auto scaler is not set
167+
}
153168
c.lock.Lock()
154169
defer c.lock.Unlock()
155170

@@ -208,19 +223,22 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
208223
return
209224
}
210225

211-
newQuota := c.concurrency.PollerPermit.Quota()
226+
var newQuota int
212227
pollerWaitTime := c.pollerWaitTime.Average()
213228
if pollerWaitTime < lowerPollerWaitTime { // pollers are busy
214229
newQuota = c.scaleUpPollerPermit(pollerWaitTime)
230+
c.concurrency.PollerPermit.SetQuota(newQuota)
231+
c.pollerPermitLastUpdate = updateTime
232+
c.logEvent(autoScalerEventPollerScaleUp)
215233
} else if pollerWaitTime > upperPollerWaitTime { // pollers are idle
216234
newQuota = c.scaleDownPollerPermit(pollerWaitTime)
235+
c.concurrency.PollerPermit.SetQuota(newQuota)
236+
c.pollerPermitLastUpdate = updateTime
237+
c.logEvent(autoScalerEventPollerScaleDown)
217238
} else {
218239
c.logEvent(autoScalerEventPollerSkipUpdateNoChange)
219240
return
220241
}
221-
c.concurrency.PollerPermit.SetQuota(newQuota)
222-
c.pollerPermitLastUpdate = updateTime
223-
c.logEvent(autoScalerEventPollerUpdate)
224242
}
225243

226244
func (c *ConcurrencyAutoScaler) scaleUpPollerPermit(pollerWaitTime time.Duration) int {

internal/worker/concurrency_auto_scaler_test.go

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func TestConcurrencyAutoScaler(t *testing.T) {
118118
{autoScalerEventStart, false, 100, "00:00:00"},
119119
{autoScalerEventEnable, true, 100, "00:00:00"},
120120
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
121-
{autoScalerEventPollerUpdate, true, 116, "00:00:02"},
121+
{autoScalerEventPollerScaleUp, true, 116, "00:00:02"},
122122
{autoScalerEventStop, true, 116, "00:00:02"},
123123
},
124124
},
@@ -132,12 +132,60 @@ func TestConcurrencyAutoScaler(t *testing.T) {
132132
{autoScalerEventStart, false, 100, "00:00:00"},
133133
{autoScalerEventEnable, true, 100, "00:00:00"},
134134
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
135-
{autoScalerEventPollerUpdate, true, 200, "00:00:02"},
135+
{autoScalerEventPollerScaleUp, true, 200, "00:00:02"},
136136
{autoScalerEventStop, true, 200, "00:00:02"},
137137
},
138138
},
139139
{
140-
"idl pollers waiting for tasks",
140+
"busy pollers, scale up to maximum, and then scale down slowly",
141+
[]*shared.AutoConfigHint{
142+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, in cool down
143+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, scale up
144+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown
145+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down
146+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown
147+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down further
148+
},
149+
[]eventLog{
150+
{autoScalerEventStart, false, 100, "00:00:00"},
151+
{autoScalerEventEnable, true, 100, "00:00:00"},
152+
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
153+
{autoScalerEventPollerScaleUp, true, 200, "00:00:02"},
154+
{autoScalerEventPollerSkipUpdateCooldown, true, 200, "00:00:03"},
155+
{autoScalerEventPollerScaleDown, true, 121, "00:00:04"},
156+
{autoScalerEventPollerSkipUpdateCooldown, true, 121, "00:00:05"},
157+
{autoScalerEventPollerScaleDown, true, 73, "00:00:06"},
158+
{autoScalerEventStop, true, 73, "00:00:06"},
159+
},
160+
},
161+
{
162+
"pollers, scale up and down multiple times",
163+
[]*shared.AutoConfigHint{
164+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, in cool down
165+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, scale up
166+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown
167+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down
168+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, skip due to cooldown
169+
{common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, scale up again
170+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown
171+
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down again
172+
},
173+
[]eventLog{
174+
{autoScalerEventStart, false, 100, "00:00:00"},
175+
{autoScalerEventEnable, true, 100, "00:00:00"},
176+
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
177+
{autoScalerEventPollerScaleUp, true, 200, "00:00:02"},
178+
{autoScalerEventPollerSkipUpdateCooldown, true, 200, "00:00:03"},
179+
{autoScalerEventPollerScaleDown, true, 121, "00:00:04"},
180+
{autoScalerEventPollerSkipUpdateCooldown, true, 121, "00:00:05"},
181+
{autoScalerEventPollerScaleUp, true, 200, "00:00:06"},
182+
{autoScalerEventPollerSkipUpdateCooldown, true, 200, "00:00:07"},
183+
{autoScalerEventPollerScaleDown, true, 121, "00:00:08"},
184+
{autoScalerEventStop, true, 121, "00:00:08"},
185+
},
186+
},
187+
{
188+
"idle pollers waiting for tasks",
141189
[]*shared.AutoConfigHint{
142190
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down
143191
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale up
@@ -146,12 +194,12 @@ func TestConcurrencyAutoScaler(t *testing.T) {
146194
{autoScalerEventStart, false, 100, "00:00:00"},
147195
{autoScalerEventEnable, true, 100, "00:00:00"},
148196
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
149-
{autoScalerEventPollerUpdate, true, 80, "00:00:02"},
197+
{autoScalerEventPollerScaleDown, true, 80, "00:00:02"},
150198
{autoScalerEventStop, true, 80, "00:00:02"},
151199
},
152200
},
153201
{
154-
"idl pollers, scale down to minimum",
202+
"idle pollers, scale down to minimum",
155203
[]*shared.AutoConfigHint{
156204
{common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, in cool down
157205
{common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, scale up
@@ -160,12 +208,12 @@ func TestConcurrencyAutoScaler(t *testing.T) {
160208
{autoScalerEventStart, false, 100, "00:00:00"},
161209
{autoScalerEventEnable, true, 100, "00:00:00"},
162210
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
163-
{autoScalerEventPollerUpdate, true, 50, "00:00:02"},
211+
{autoScalerEventPollerScaleDown, true, 50, "00:00:02"},
164212
{autoScalerEventStop, true, 50, "00:00:02"},
165213
},
166214
},
167215
{
168-
"idl pollers but disabled scaling",
216+
"idle pollers but disabled scaling",
169217
[]*shared.AutoConfigHint{
170218
{common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, in cool down
171219
{common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, scale up
@@ -178,7 +226,7 @@ func TestConcurrencyAutoScaler(t *testing.T) {
178226
},
179227
},
180228
{
181-
"idl pollers but disabled scaling at a later time",
229+
"idle pollers but disabled scaling at a later time",
182230
[]*shared.AutoConfigHint{
183231
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down
184232
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale up
@@ -188,14 +236,14 @@ func TestConcurrencyAutoScaler(t *testing.T) {
188236
{autoScalerEventStart, false, 100, "00:00:00"},
189237
{autoScalerEventEnable, true, 100, "00:00:00"},
190238
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
191-
{autoScalerEventPollerUpdate, true, 80, "00:00:02"},
239+
{autoScalerEventPollerScaleDown, true, 80, "00:00:02"},
192240
{autoScalerEventDisable, false, 100, "00:00:02"},
193241
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:03"},
194242
{autoScalerEventStop, false, 100, "00:00:03"},
195243
},
196244
},
197245
{
198-
"idl pollers and enabled at a later time",
246+
"idle pollers and enabled at a later time",
199247
[]*shared.AutoConfigHint{
200248
{common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, in cool down
201249
{common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, not enabled
@@ -206,7 +254,7 @@ func TestConcurrencyAutoScaler(t *testing.T) {
206254
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:01"},
207255
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:02"},
208256
{autoScalerEventEnable, true, 100, "00:00:02"},
209-
{autoScalerEventPollerUpdate, true, 80, "00:00:03"},
257+
{autoScalerEventPollerScaleDown, true, 80, "00:00:03"},
210258
{autoScalerEventStop, true, 80, "00:00:03"},
211259
},
212260
},
@@ -226,7 +274,9 @@ func TestConcurrencyAutoScaler(t *testing.T) {
226274
clock.Sleep(testTickTime / 2) // poll delay by 0.5 unit of time to avoid test flakiness
227275
for _, hint := range tt.pollAutoConfigHint {
228276
t.Log("hint process time: ", clock.Now().Format(testTimeFormat))
229-
scaler.ProcessPollerHint(hint)
277+
for i := 0; i < numberOfPollsInRollingAverage; i++ { // simulate polling multiple times to avoid flakiness
278+
scaler.ProcessPollerHint(hint)
279+
}
230280
clock.Sleep(testTickTime)
231281
}
232282
}()
@@ -249,7 +299,7 @@ func TestConcurrencyAutoScaler(t *testing.T) {
249299

250300
var actualEvents []eventLog
251301
for _, event := range obs.FilterMessage(autoScalerEventLogMsg).All() {
252-
if event.ContextMap()["event"] != autoScalerEventMetrics {
302+
if event.ContextMap()["event"] != string(autoScalerEventEmitMetrics) {
253303
t.Log("event: ", event.ContextMap())
254304
actualEvents = append(actualEvents, eventLog{
255305
eventType: autoScalerEvent(event.ContextMap()["event"].(string)),

0 commit comments

Comments
 (0)