Skip to content

Commit ff2b799

Browse files
authored
adding more client metrics (#207)
* adding more client metrics * make tagScope take tagName, tagValue
1 parent c9819c3 commit ff2b799

File tree

4 files changed

+31
-19
lines changed

4 files changed

+31
-19
lines changed

client.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,7 @@ func NewClient(service m.TChanWorkflowService, domain string, options *ClientOpt
201201
if options != nil {
202202
metricScope = options.MetricsScope
203203
}
204-
if metricScope == nil {
205-
metricScope = tally.NoopScope
206-
}
204+
metricScope = tagScope(metricScope, tagDomain, domain)
207205
return &workflowClient{
208206
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
209207
domain: domain,
@@ -224,9 +222,7 @@ func NewDomainClient(service m.TChanWorkflowService, options *ClientOptions) Dom
224222
if options != nil {
225223
metricScope = options.MetricsScope
226224
}
227-
if metricScope == nil {
228-
metricScope = tally.NoopScope
229-
}
225+
metricScope = tagScope(metricScope, tagDomain, "domain-client")
230226
return &domainClient{
231227
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
232228
metricsScope: metricScope,

common/metrics/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ const (
6464

6565
UnhandledSignalsCounter = "unhandled-signals"
6666

67+
WorkerStartCounter = "worker-start"
68+
PollerStartCounter = "poller-start"
69+
6770
CadenceRequest = "cadence-request"
6871
CadenceError = "cadence-error"
6972
CadenceLatency = "cadence-latency"

internal_worker.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ func newWorkflowTaskWorkerInternal(
233233
taskWorker: poller,
234234
identity: params.Identity,
235235
workerType: "DecisionWorker"},
236-
params.Logger)
236+
params.Logger,
237+
params.MetricsScope,
238+
)
237239

238240
return &workflowWorker{
239241
executionParameters: params,
@@ -313,6 +315,7 @@ func newActivityTaskWorker(
313315
workerType: "ActivityWorker",
314316
},
315317
workerParams.Logger,
318+
workerParams.MetricsScope,
316319
)
317320

318321
return &activityWorker{
@@ -1026,10 +1029,7 @@ func newAggregatedWorker(
10261029
}
10271030

10281031
ensureRequiredParams(&workerParams)
1029-
tags := map[string]string{
1030-
tagDomain: domain,
1031-
}
1032-
workerParams.MetricsScope = workerParams.MetricsScope.Tagged(tags)
1032+
workerParams.MetricsScope = tagScope(workerParams.MetricsScope, tagDomain, domain)
10331033
workerParams.Logger = workerParams.Logger.With(
10341034
zapcore.Field{Key: tagDomain, Type: zapcore.StringType, String: domain},
10351035
zapcore.Field{Key: tagTaskList, Type: zapcore.StringType, String: taskList},
@@ -1083,6 +1083,14 @@ func newAggregatedWorker(
10831083
}
10841084
}
10851085

1086+
func tagScope(metricsScope tally.Scope, tagName, tagValue string) tally.Scope {
1087+
if metricsScope == nil {
1088+
metricsScope = tally.NoopScope
1089+
}
1090+
tagsMap := map[string]string{tagName: tagValue}
1091+
return metricsScope.Tagged(tagsMap)
1092+
}
1093+
10861094
func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) {
10871095
testTags := getTestTags(wOptions.BackgroundActivityContext)
10881096
if testTags != nil {

internal_worker_base.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/uber-go/tally"
3131
"go.uber.org/cadence/common/backoff"
32+
"go.uber.org/cadence/common/metrics"
3233
"go.uber.org/zap"
3334
"go.uber.org/zap/zapcore"
3435
"golang.org/x/time/rate"
@@ -102,6 +103,7 @@ type (
102103
limiterContextCancel func()
103104
retrier *backoff.ConcurrentRetrier // Service errors back off retrier
104105
logger *zap.Logger
106+
metricsScope tally.Scope
105107

106108
pollerRequestCh chan struct{}
107109
taskQueueCh chan interface{}
@@ -115,16 +117,16 @@ func createPollRetryPolicy() backoff.RetryPolicy {
115117
return policy
116118
}
117119

118-
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger) *baseWorker {
120+
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope) *baseWorker {
119121
ctx, cancel := context.WithCancel(context.Background())
120122
return &baseWorker{
121-
options: options,
122-
shutdownCh: make(chan struct{}),
123-
pollLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 100),
124-
taskLimiter: rate.NewLimiter(rate.Every(options.maxTaskRateRefreshDuration), options.maxTaskRate),
125-
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
126-
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
127-
123+
options: options,
124+
shutdownCh: make(chan struct{}),
125+
pollLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 100),
126+
taskLimiter: rate.NewLimiter(rate.Every(options.maxTaskRateRefreshDuration), options.maxTaskRate),
127+
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
128+
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
129+
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
128130
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
129131
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
130132

@@ -139,6 +141,8 @@ func (bw *baseWorker) Start() {
139141
return
140142
}
141143

144+
bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1)
145+
142146
for i := 0; i < bw.options.pollerCount; i++ {
143147
bw.shutdownWG.Add(1)
144148
go bw.runPoller()
@@ -168,6 +172,7 @@ func (bw *baseWorker) isShutdown() bool {
168172

169173
func (bw *baseWorker) runPoller() {
170174
defer bw.shutdownWG.Done()
175+
bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)
171176

172177
for {
173178
select {

0 commit comments

Comments
 (0)