@@ -107,6 +107,7 @@ type (
107107
108108 // baseWorkerOptions options to configure base worker.
109109 baseWorkerOptions struct {
110+ pollerAutoScaler pollerAutoScalerOptions
110111 pollerCount int
111112 pollerRate int
112113 maxConcurrentTask int
@@ -133,6 +134,7 @@ type (
133134 metricsScope tally.Scope
134135
135136 pollerRequestCh chan struct {}
137+ pollerAutoScaler * pollerAutoScaler
136138 taskQueueCh chan interface {}
137139 sessionTokenBucket * sessionTokenBucket
138140 }
@@ -156,15 +158,25 @@ func createPollRetryPolicy() backoff.RetryPolicy {
156158
157159func newBaseWorker (options baseWorkerOptions , logger * zap.Logger , metricsScope tally.Scope , sessionTokenBucket * sessionTokenBucket ) * baseWorker {
158160 ctx , cancel := context .WithCancel (context .Background ())
161+
162+ var pollerAS * pollerAutoScaler
163+ if pollerOptions := options .pollerAutoScaler ; pollerOptions .Enabled {
164+ pollerAS = newPollerScaler (
165+ pollerOptions ,
166+ logger ,
167+ )
168+ }
169+
159170 bw := & baseWorker {
160- options : options ,
161- shutdownCh : make (chan struct {}),
162- taskLimiter : rate .NewLimiter (rate .Limit (options .maxTaskPerSecond ), 1 ),
163- retrier : backoff .NewConcurrentRetrier (pollOperationRetryPolicy ),
164- logger : logger .With (zapcore.Field {Key : tagWorkerType , Type : zapcore .StringType , String : options .workerType }),
165- metricsScope : tagScope (metricsScope , tagWorkerType , options .workerType ),
166- pollerRequestCh : make (chan struct {}, options .maxConcurrentTask ),
167- taskQueueCh : make (chan interface {}), // no buffer, so poller only able to poll new task after previous is dispatched.
171+ options : options ,
172+ shutdownCh : make (chan struct {}),
173+ taskLimiter : rate .NewLimiter (rate .Limit (options .maxTaskPerSecond ), 1 ),
174+ retrier : backoff .NewConcurrentRetrier (pollOperationRetryPolicy ),
175+ logger : logger .With (zapcore.Field {Key : tagWorkerType , Type : zapcore .StringType , String : options .workerType }),
176+ metricsScope : tagScope (metricsScope , tagWorkerType , options .workerType ),
177+ pollerRequestCh : make (chan struct {}, options .maxConcurrentTask ),
178+ pollerAutoScaler : pollerAS ,
179+ taskQueueCh : make (chan interface {}), // no buffer, so poller only able to poll new task after previous is dispatched.
168180
169181 limiterContext : ctx ,
170182 limiterContextCancel : cancel ,
@@ -185,6 +197,10 @@ func (bw *baseWorker) Start() {
185197
186198 bw .metricsScope .Counter (metrics .WorkerStartCounter ).Inc (1 )
187199
200+ if bw .pollerAutoScaler != nil {
201+ bw .pollerAutoScaler .Start ()
202+ }
203+
188204 for i := 0 ; i < bw .options .pollerCount ; i ++ {
189205 bw .shutdownWG .Add (1 )
190206 go bw .runPoller ()
@@ -255,9 +271,24 @@ func (bw *baseWorker) runTaskDispatcher() {
255271 }
256272}
257273
274+ /*
275+ There are three types of constraint on polling tasks:
276+ 1. poller auto scaler is to constraint number of concurrent pollers
277+ 2. retrier is a backoff constraint on errors
278+ 3. limiter is a per-second constraint
279+ */
258280func (bw * baseWorker ) pollTask () {
259281 var err error
260282 var task interface {}
283+
284+ if bw .pollerAutoScaler != nil {
285+ if pErr := bw .pollerAutoScaler .Acquire (1 ); pErr == nil {
286+ defer bw .pollerAutoScaler .Release (1 )
287+ } else {
288+ bw .logger .Warn ("poller auto scaler acquire error" , zap .Error (pErr ))
289+ }
290+ }
291+
261292 bw .retrier .Throttle ()
262293 if bw .pollLimiter == nil || bw .pollLimiter .Wait (bw .limiterContext ) == nil {
263294 task , err = bw .options .taskWorker .PollTask ()
@@ -273,6 +304,11 @@ func (bw *baseWorker) pollTask() {
273304 }
274305 bw .retrier .Failed ()
275306 } else {
307+ if bw .pollerAutoScaler != nil {
308+ if pErr := bw .pollerAutoScaler .CollectUsage (task ); pErr != nil {
309+ bw .logger .Warn ("poller auto scaler collect usage error" , zap .Error (pErr ))
310+ }
311+ }
276312 bw .retrier .Succeeded ()
277313 }
278314 }
@@ -347,6 +383,9 @@ func (bw *baseWorker) Stop() {
347383 }
348384 close (bw .shutdownCh )
349385 bw .limiterContextCancel ()
386+ if bw .pollerAutoScaler != nil {
387+ bw .pollerAutoScaler .Stop ()
388+ }
350389
351390 if success := util .AwaitWaitGroup (& bw .shutdownWG , bw .options .shutdownTimeout ); ! success {
352391 traceLog (func () {
0 commit comments