72
72
service workflowserviceclient.Interface
73
73
taskHandler WorkflowTaskHandler
74
74
ldaTunnel * locallyDispatchedActivityTunnel
75
- metricsScope tally. Scope
75
+ metricsScope * metrics. TaggedScope
76
76
logger * zap.Logger
77
77
78
78
stickyUUID string
@@ -276,7 +276,7 @@ func newWorkflowTaskPoller(
276
276
identity : params .Identity ,
277
277
taskHandler : taskHandler ,
278
278
ldaTunnel : ldaTunnel ,
279
- metricsScope : params .MetricsScope ,
279
+ metricsScope : metrics . NewTaggedScope ( params .MetricsScope ) ,
280
280
logger : params .Logger ,
281
281
stickyUUID : uuid .New (),
282
282
disableStickyExecution : params .DisableStickyExecution ,
@@ -385,8 +385,9 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
385
385
386
386
func (wtp * workflowTaskPoller ) RespondTaskCompletedWithMetrics (completedRequest interface {}, taskErr error , task * s.PollForDecisionTaskResponse , startTime time.Time ) (response * s.RespondDecisionTaskCompletedResponse , err error ) {
387
387
388
+ metricsScope := wtp .metricsScope .GetTaggedScope (tagWorkflowType , task .WorkflowType .GetName ())
388
389
if taskErr != nil {
389
- wtp . metricsScope .Counter (metrics .DecisionExecutionFailedCounter ).Inc (1 )
390
+ metricsScope .Counter (metrics .DecisionExecutionFailedCounter ).Inc (1 )
390
391
wtp .logger .Warn ("Failed to process decision task." ,
391
392
zap .String (tagWorkflowType , task .WorkflowType .GetName ()),
392
393
zap .String (tagWorkflowID , task .WorkflowExecution .GetWorkflowId ()),
@@ -395,17 +396,17 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
395
396
// convert err to DecisionTaskFailed
396
397
completedRequest = errorToFailDecisionTask (task .TaskToken , taskErr , wtp .identity )
397
398
} else {
398
- wtp . metricsScope .Counter (metrics .DecisionTaskCompletedCounter ).Inc (1 )
399
+ metricsScope .Counter (metrics .DecisionTaskCompletedCounter ).Inc (1 )
399
400
}
400
401
401
- wtp . metricsScope .Timer (metrics .DecisionExecutionLatency ).Record (time .Now ().Sub (startTime ))
402
+ metricsScope .Timer (metrics .DecisionExecutionLatency ).Record (time .Now ().Sub (startTime ))
402
403
403
404
responseStartTime := time .Now ()
404
405
if response , err = wtp .RespondTaskCompleted (completedRequest , task ); err != nil {
405
- wtp . metricsScope .Counter (metrics .DecisionResponseFailedCounter ).Inc (1 )
406
+ metricsScope .Counter (metrics .DecisionResponseFailedCounter ).Inc (1 )
406
407
return
407
408
}
408
- wtp . metricsScope .Timer (metrics .DecisionResponseLatency ).Record (time .Now ().Sub (responseStartTime ))
409
+ metricsScope .Timer (metrics .DecisionResponseLatency ).Record (time .Now ().Sub (responseStartTime ))
409
410
410
411
return
411
412
}
@@ -787,11 +788,12 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
787
788
zap .Bool ("IsQueryTask" , response .Query != nil ))
788
789
})
789
790
790
- wtp .metricsScope .Counter (metrics .DecisionPollSucceedCounter ).Inc (1 )
791
- wtp .metricsScope .Timer (metrics .DecisionPollLatency ).Record (time .Now ().Sub (startTime ))
791
+ metricsScope := wtp .metricsScope .GetTaggedScope (tagWorkflowType , response .WorkflowType .GetName ())
792
+ metricsScope .Counter (metrics .DecisionPollSucceedCounter ).Inc (1 )
793
+ metricsScope .Timer (metrics .DecisionPollLatency ).Record (time .Now ().Sub (startTime ))
792
794
793
795
scheduledToStartLatency := time .Duration (response .GetStartedTimestamp () - response .GetScheduledTimestamp ())
794
- wtp . metricsScope .Timer (metrics .DecisionScheduledToStartLatency ).Record (scheduledToStartLatency )
796
+ metricsScope .Timer (metrics .DecisionScheduledToStartLatency ).Record (scheduledToStartLatency )
795
797
return task , nil
796
798
}
797
799
@@ -966,11 +968,14 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context,
966
968
return & activityTask {}, nil
967
969
}
968
970
969
- atp .metricsScope .Counter (metrics .ActivityPollSucceedCounter ).Inc (1 )
970
- atp .metricsScope .Timer (metrics .ActivityPollLatency ).Record (time .Now ().Sub (startTime ))
971
+ workflowType := response .WorkflowType .GetName ()
972
+ activityType := response .ActivityType .GetName ()
973
+ metricsScope := getMetricsScopeForActivity (atp .metricsScope , workflowType , activityType )
974
+ metricsScope .Counter (metrics .ActivityPollSucceedCounter ).Inc (1 )
975
+ metricsScope .Timer (metrics .ActivityPollLatency ).Record (time .Now ().Sub (startTime ))
971
976
972
977
scheduledToStartLatency := time .Duration (response .GetStartedTimestamp () - response .GetScheduledTimestampOfThisAttempt ())
973
- atp . metricsScope .Timer (metrics .ActivityScheduledToStartLatency ).Record (scheduledToStartLatency )
978
+ metricsScope .Timer (metrics .ActivityScheduledToStartLatency ).Record (scheduledToStartLatency )
974
979
975
980
return & activityTask {task : response , pollStartTime : startTime }, nil
976
981
}
0 commit comments