Skip to content

Commit 7087b88

Browse files
committed
address most comments except for max poller settings
1 parent 4b74bcf commit 7087b88

File tree

5 files changed

+15
-10
lines changed

5 files changed

+15
-10
lines changed

internal/internal_worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,7 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
12901290
}
12911291

12921292
func augmentAutoScalerOptions(options AutoScalerOptions) AutoScalerOptions {
1293+
// need at least 2 for sticky task polling to work
12931294
if options.PollerMinCount <= 1 {
12941295
options.PollerMinCount = defaultMinPollerSize
12951296
}

internal/internal_worker_base.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"github.com/jonboulle/clockwork"
3939
"github.com/uber-go/tally"
4040
"go.uber.org/zap"
41-
"go.uber.org/zap/zapcore"
4241
"golang.org/x/time/rate"
4342

4443
"go.uber.org/cadence/.gen/go/shared"
@@ -168,7 +167,7 @@ func createPollRetryPolicy() backoff.RetryPolicy {
168167

169168
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
170169
ctx, cancel := context.WithCancel(context.Background())
171-
logger = logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType})
170+
logger = logger.With(zap.String(tagWorkerType, options.workerType))
172171
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)
173172

174173
concurrency := &worker.ConcurrencyLimit{

internal/internal_worker_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,10 +1551,12 @@ func TestGetTaskAutoConfigHint(t *testing.T) {
15511551
},
15521552
} {
15531553
t.Run(tt.name, func(t *testing.T) {
1554-
if task, ok := tt.task.(autoConfigHintAwareTask); ok {
1555-
assert.Equal(t, tt.want, task.getAutoConfigHint())
1554+
task, ok := tt.task.(autoConfigHintAwareTask)
1555+
if tt.want == nil {
1556+
assert.False(t, ok)
15561557
} else {
1557-
assert.Nil(t, tt.want)
1558+
assert.True(t, ok)
1559+
assert.Equal(t, tt.want, task.getAutoConfigHint())
15581560
}
15591561
})
15601562
}

internal/worker/concurrency_auto_scaler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ const (
4848
autoScalerEventStart autoScalerEvent = "auto-scaler-start"
4949
autoScalerEventStop autoScalerEvent = "auto-scaler-stop"
5050
autoScalerEventLogMsg string = "concurrency auto scaler event"
51-
testTimeFormat string = "15:04:05"
5251

5352
metricsEnabled = "enabled"
5453
metricsDisabled = "disabled"
@@ -136,12 +135,12 @@ func (c *ConcurrencyAutoScaler) Start() {
136135
c.wg.Add(1)
137136

138137
go func() {
138+
defer c.wg.Done()
139139
defer func() {
140140
if r := recover(); r != nil {
141141
c.log.Error("panic in concurrency auto scaler, stopping the auto scaler", zap.Any("error", r))
142142
}
143143
}()
144-
defer c.wg.Done()
145144
ticker := c.clock.NewTicker(c.updateTick)
146145
defer ticker.Stop()
147146
for {
@@ -162,9 +161,11 @@ func (c *ConcurrencyAutoScaler) Stop() {
162161
if c == nil {
163162
return // no-op if auto scaler is not set
164163
}
164+
c.lock.Lock()
165+
c.logEvent(autoScalerEventStop)
166+
c.lock.Unlock()
165167
close(c.shutdownChan)
166168
c.wg.Wait()
167-
c.logEvent(autoScalerEventStop)
168169
}
169170

170171
// ProcessPollerHint reads the poller response hint and take actions in a transactional way

internal/worker/concurrency_auto_scaler_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ import (
3939
)
4040

4141
const (
42-
testTickTime = 1 * time.Second
42+
testTickTime = 1 * time.Second
43+
testTimeFormat = time.TimeOnly
4344
)
4445

4546
func createTestConcurrencyAutoScaler(t *testing.T, logger *zap.Logger, clock clockwork.Clock) *ConcurrencyAutoScaler {
@@ -303,11 +304,12 @@ func TestConcurrencyAutoScaler(t *testing.T) {
303304
for _, event := range obs.FilterMessage(autoScalerEventLogMsg).All() {
304305
if event.ContextMap()["event"] != string(autoScalerEventEmitMetrics) {
305306
t.Log("event: ", event.ContextMap())
307+
306308
actualEvents = append(actualEvents, eventLog{
307309
eventType: autoScalerEvent(event.ContextMap()["event"].(string)),
308310
enabled: event.ContextMap()["enabled"].(bool),
309311
pollerQuota: event.ContextMap()["poller_quota"].(int64),
310-
time: event.ContextMap()["time"].(time.Time).Format(testTimeFormat),
312+
time: event.Time.Format(testTimeFormat),
311313
})
312314
}
313315
}

0 commit comments

Comments
 (0)