Skip to content
17 changes: 9 additions & 8 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"sync"
"time"

"github.com/marusama/semaphore/v2"
"go.uber.org/atomic"
"go.uber.org/zap"

"go.uber.org/cadence/internal/common/autoscaler"
"go.uber.org/cadence/internal/worker"
)

// defaultPollerScalerCooldownInSeconds
Expand All @@ -53,7 +53,7 @@ type (
isDryRun bool
cooldownTime time.Duration
logger *zap.Logger
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
permit worker.Permit
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup // graceful stop
Expand Down Expand Up @@ -82,6 +82,7 @@ type (
func newPollerScaler(
options pollerAutoScalerOptions,
logger *zap.Logger,
permit worker.Permit,
hooks ...func()) *pollerAutoScaler {
if !options.Enabled {
return nil
Expand All @@ -91,7 +92,7 @@ func newPollerScaler(
isDryRun: options.DryRun,
cooldownTime: options.Cooldown,
logger: logger,
sem: semaphore.New(options.InitCount),
permit: permit,
wg: &sync.WaitGroup{},
ctx: ctx,
cancel: cancel,
Expand All @@ -109,17 +110,17 @@ func newPollerScaler(

// Acquire concurrent poll quota
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
return p.sem.Acquire(p.ctx, int(resource))
return p.permit.Acquire(p.ctx, int(resource))
}

// Release concurrent poll quota
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
p.sem.Release(int(resource))
p.permit.Release(int(resource))
}

// GetCurrent poll quota
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
return autoscaler.ResourceUnit(p.sem.GetLimit())
return autoscaler.ResourceUnit(p.permit.Quota())
}

// Start an auto-scaler go routine and returns a done to stop it
Expand All @@ -133,7 +134,7 @@ func (p *pollerAutoScaler) Start() {
case <-p.ctx.Done():
return
case <-time.After(p.cooldownTime):
currentResource := autoscaler.ResourceUnit(p.sem.GetLimit())
currentResource := autoscaler.ResourceUnit(p.permit.Quota())
currentUsages, err := p.pollerUsageEstimator.Estimate()
if err != nil {
logger.Warnw("poller autoscaler skip due to estimator error", "error", err)
Expand All @@ -146,7 +147,7 @@ func (p *pollerAutoScaler) Start() {
"recommend", uint64(proposedResource),
"isDryRun", p.isDryRun)
if !p.isDryRun {
p.sem.SetLimit(int(proposedResource))
p.permit.SetQuota(int(proposedResource))
}
p.pollerUsageEstimator.Reset()

Expand Down
2 changes: 2 additions & 0 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"go.uber.org/cadence/internal/common/testlogger"
"go.uber.org/cadence/internal/worker"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
Expand Down Expand Up @@ -171,6 +172,7 @@ func Test_pollerAutoscaler(t *testing.T) {
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
},
testlogger.NewZap(t),
worker.NewPermit(tt.args.initialPollerCount),
// hook function that collects number of iterations
func() {
autoscalerEpoch.Add(1)
Expand Down
32 changes: 19 additions & 13 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"go.uber.org/cadence/internal/common/debug"
"go.uber.org/cadence/internal/worker"

"github.com/uber-go/tally"
"go.uber.org/zap"
Expand Down Expand Up @@ -141,7 +142,7 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
concurrency *worker.ConcurrencyLimit
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
Expand All @@ -167,11 +168,18 @@ func createPollRetryPolicy() backoff.RetryPolicy {
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())

concurrency := &worker.ConcurrencyLimit{
PollerPermit: worker.NewPermit(options.pollerCount),
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
}

var pollerAS *pollerAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
concurrency.PollerPermit = worker.NewPermit(pollerOptions.InitCount)
pollerAS = newPollerScaler(
pollerOptions,
logger,
concurrency.PollerPermit,
)
}

Expand All @@ -182,7 +190,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
concurrency: concurrency,
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
limiterContext: ctx,
Expand Down Expand Up @@ -244,11 +252,13 @@ func (bw *baseWorker) runPoller() {
select {
case <-bw.shutdownCh:
return
case <-bw.pollerRequestCh:
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(cap(bw.pollerRequestCh)))
// This metric is used to monitor how many poll requests have been allocated
// and can be used to approximate number of concurrent task running (not pinpoint accurate)
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(cap(bw.pollerRequestCh) - len(bw.pollerRequestCh)))
case <-bw.concurrency.TaskPermit.AcquireChan(bw.limiterContext, &bw.shutdownWG): // don't poll unless there is a task permit
// TODO move to a centralized place inside the worker
// emit metrics on concurrent task permit quota and current task permit count
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
// thus the metrics is only an estimated value of how many tasks are running concurrently
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(bw.concurrency.TaskPermit.Quota()))
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(bw.concurrency.TaskPermit.Count()))
if bw.sessionTokenBucket != nil {
bw.sessionTokenBucket.waitForAvailableToken()
}
Expand All @@ -260,10 +270,6 @@ func (bw *baseWorker) runPoller() {
func (bw *baseWorker) runTaskDispatcher() {
defer bw.shutdownWG.Done()

for i := 0; i < bw.options.maxConcurrentTask; i++ {
bw.pollerRequestCh <- struct{}{}
}

for {
// wait for new task or shutdown
select {
Expand Down Expand Up @@ -333,7 +339,7 @@ func (bw *baseWorker) pollTask() {
case <-bw.shutdownCh:
}
} else {
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
bw.concurrency.TaskPermit.Release(1) // poll failed, trigger a new poll by returning a task permit
}
}

Expand Down Expand Up @@ -368,7 +374,7 @@ func (bw *baseWorker) processTask(task interface{}) {
}

if isPolledTask {
bw.pollerRequestCh <- struct{}{}
bw.concurrency.TaskPermit.Release(1) // task processed, trigger a new poll by returning a task permit
}
}()
err := bw.options.taskWorker.ProcessTask(task)
Expand Down
98 changes: 98 additions & 0 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package worker

import (
"context"
"fmt"
"sync"

"github.com/marusama/semaphore/v2"
)

var _ Permit = (*permit)(nil)

// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
type ConcurrencyLimit struct {
PollerPermit Permit // controls concurrency of pollers
TaskPermit Permit // controlls concurrency of task processings
}

// Permit is an adaptive permit issuer to control concurrency
type Permit interface {
Acquire(context.Context, int) error
AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}
Quota() int
SetQuota(int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems worth splitting this out so you don't have to no-op it. It isn't even needed - this all compiles fine with SetQuota only defined on a non-channel permit.

Count() int
Release(count int)
}

type permit struct {
sem semaphore.Semaphore
}

// NewPermit creates a dynamic permit that's resizable
func NewPermit(initCount int) Permit {
return &permit{sem: semaphore.New(initCount)}
}

// Acquire is blocking until a permit is acquired or returns error after context is done
func (p *permit) Acquire(ctx context.Context, count int) error {
if err := p.sem.Acquire(ctx, count); err != nil {
return fmt.Errorf("failed to acquire permit before context is done: %w", err)
}
return nil
}

// AcquireChan returns a permit ready channel. It's closed then permit is acquired
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := p.sem.Acquire(ctx, 1); err != nil {
return
}
select { // try to send to channel, but don't block if listener is gone
case ch <- struct{}{}:
default:
p.sem.Release(1)
}
}()
return ch
}

func (p *permit) Release(count int) {
p.sem.Release(count)
}

func (p *permit) Quota() int {
return p.sem.GetLimit()
}

func (p *permit) SetQuota(c int) {
p.sem.SetLimit(c)
}

func (p *permit) Count() int {
return p.sem.GetCount()
}
Loading