Skip to content

Commit 33b0302

Browse files
authored
Add missing metrics tags for activity and decision task processing (#292)
1 parent 55446d4 commit 33b0302

File tree

3 files changed

+72
-16
lines changed

3 files changed

+72
-16
lines changed

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import com.uber.cadence.common.RetryOptions;
2222
import com.uber.cadence.internal.common.Retryer;
2323
import com.uber.cadence.internal.logging.LoggerTag;
24+
import com.uber.cadence.internal.metrics.MetricsTag;
2425
import com.uber.cadence.internal.metrics.MetricsType;
2526
import com.uber.cadence.internal.worker.ActivityTaskHandler.Result;
2627
import com.uber.cadence.serviceclient.IWorkflowService;
28+
import com.uber.m3.tally.Scope;
2729
import com.uber.m3.tally.Stopwatch;
2830
import com.uber.m3.util.Duration;
31+
import com.uber.m3.util.ImmutableMap;
2932
import java.nio.charset.StandardCharsets;
3033
import java.util.Objects;
3134
import java.util.concurrent.CancellationException;
@@ -157,26 +160,31 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
157160

158161
@Override
159162
public void handle(MeasurableActivityTask task) throws Exception {
160-
options
161-
.getMetricsScope()
163+
Scope metricsScope =
164+
options
165+
.getMetricsScope()
166+
.tagged(
167+
ImmutableMap.of(MetricsTag.ACTIVITY_TYPE, task.task.getActivityType().getName()));
168+
metricsScope
162169
.timer(MetricsType.TASK_LIST_QUEUE_LATENCY)
163170
.record(
164171
Duration.ofNanos(
165172
task.task.getStartedTimestamp() - task.task.getScheduledTimestamp()));
166173

174+
// The following tags are for logging.
167175
MDC.put(LoggerTag.ACTIVITY_ID, task.task.getActivityId());
168176
MDC.put(LoggerTag.ACTIVITY_TYPE, task.task.getActivityType().getName());
169177
MDC.put(LoggerTag.WORKFLOW_ID, task.task.getWorkflowExecution().getWorkflowId());
170178
MDC.put(LoggerTag.RUN_ID, task.task.getWorkflowExecution().getRunId());
171179

172180
try {
173-
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
181+
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
174182
ActivityTaskHandler.Result response =
175-
handler.handle(service, domain, task.task, options.getMetricsScope());
183+
handler.handle(service, domain, task.task, metricsScope);
176184
sw.stop();
177185

178-
sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
179-
sendReply(task.task, response);
186+
sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
187+
sendReply(task.task, response, metricsScope);
180188
sw.stop();
181189

182190
task.markDone();
@@ -185,8 +193,8 @@ public void handle(MeasurableActivityTask task) throws Exception {
185193
new RespondActivityTaskCanceledRequest();
186194
cancelledRequest.setDetails(
187195
String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8));
188-
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
189-
sendReply(task.task, new Result(null, null, cancelledRequest, null));
196+
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
197+
sendReply(task.task, new Result(null, null, cancelledRequest, null), metricsScope);
190198
sw.stop();
191199
} finally {
192200
MDC.remove(LoggerTag.ACTIVITY_ID);
@@ -211,7 +219,8 @@ public Throwable wrapFailure(MeasurableActivityTask task, Throwable failure) {
211219
failure);
212220
}
213221

214-
private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Result response)
222+
private void sendReply(
223+
PollForActivityTaskResponse task, ActivityTaskHandler.Result response, Scope metricsScope)
215224
throws TException {
216225
RetryOptions ro = response.getRequestRetryOptions();
217226
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
@@ -225,7 +234,7 @@ private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Res
225234
taskCompleted.setTaskToken(task.getTaskToken());
226235
taskCompleted.setIdentity(options.getIdentity());
227236
Retryer.retry(ro, () -> service.RespondActivityTaskCompleted(taskCompleted));
228-
options.getMetricsScope().counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
237+
metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
229238
} else {
230239
RespondActivityTaskFailedRequest taskFailed = response.getTaskFailed();
231240
if (taskFailed != null) {
@@ -240,7 +249,7 @@ private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Res
240249
taskFailed.setTaskToken(task.getTaskToken());
241250
taskFailed.setIdentity(options.getIdentity());
242251
Retryer.retry(ro, () -> service.RespondActivityTaskFailed(taskFailed));
243-
options.getMetricsScope().counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1);
252+
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1);
244253
} else {
245254
RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled();
246255
if (taskCancelled != null) {
@@ -255,7 +264,7 @@ private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Res
255264
EntityNotExistsError.class,
256265
DomainNotActiveError.class);
257266
Retryer.retry(ro, () -> service.RespondActivityTaskCanceled(taskCancelled));
258-
options.getMetricsScope().counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1);
267+
metricsScope.counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1);
259268
}
260269
}
261270
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import com.uber.cadence.internal.common.Retryer;
2525
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2626
import com.uber.cadence.internal.logging.LoggerTag;
27+
import com.uber.cadence.internal.metrics.MetricsTag;
2728
import com.uber.cadence.internal.metrics.MetricsType;
2829
import com.uber.cadence.serviceclient.IWorkflowService;
30+
import com.uber.m3.tally.Scope;
2931
import com.uber.m3.tally.Stopwatch;
32+
import com.uber.m3.util.ImmutableMap;
3033
import java.util.Arrays;
3134
import java.util.List;
3235
import java.util.Objects;
@@ -228,20 +231,24 @@ private TaskHandlerImpl(DecisionTaskHandler handler) {
228231

229232
@Override
230233
public void handle(PollForDecisionTaskResponse task) throws Exception {
234+
Scope metricsScope =
235+
options
236+
.getMetricsScope()
237+
.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName()));
238+
231239
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
232240
MDC.put(LoggerTag.WORKFLOW_TYPE, task.getWorkflowType().getName());
233241
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
234242
try {
235-
Stopwatch sw =
236-
options.getMetricsScope().timer(MetricsType.DECISION_EXECUTION_LATENCY).start();
243+
Stopwatch sw = metricsScope.timer(MetricsType.DECISION_EXECUTION_LATENCY).start();
237244
DecisionTaskHandler.Result response = handler.handleDecisionTask(task);
238245
sw.stop();
239246

240-
sw = options.getMetricsScope().timer(MetricsType.DECISION_RESPONSE_LATENCY).start();
247+
sw = metricsScope.timer(MetricsType.DECISION_RESPONSE_LATENCY).start();
241248
sendReply(service, task.getTaskToken(), response);
242249
sw.stop();
243250

244-
options.getMetricsScope().counter(MetricsType.DECISION_TASK_COMPLETED_COUNTER).inc(1);
251+
metricsScope.counter(MetricsType.DECISION_TASK_COMPLETED_COUNTER).inc(1);
245252
} finally {
246253
MDC.remove(LoggerTag.WORKFLOW_ID);
247254
MDC.remove(LoggerTag.WORKFLOW_TYPE);

src/test/java/com/uber/cadence/workflow/MetricsTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import static org.mockito.Matchers.eq;
2323
import static org.mockito.Mockito.*;
2424

25+
import com.uber.cadence.activity.ActivityOptions;
2526
import com.uber.cadence.client.WorkflowClient;
2627
import com.uber.cadence.client.WorkflowOptions;
28+
import com.uber.cadence.common.RetryOptions;
2729
import com.uber.cadence.internal.metrics.MetricsTag;
2830
import com.uber.cadence.internal.metrics.MetricsType;
2931
import com.uber.cadence.testing.TestEnvironmentOptions;
@@ -73,6 +75,22 @@ public static class TestMetricsInWorkflow implements TestWorkflow {
7375
public void execute() {
7476
Workflow.getMetricsScope().counter("test-started").inc(1);
7577

78+
ActivityOptions activityOptions =
79+
new ActivityOptions.Builder()
80+
.setTaskList(taskList)
81+
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
82+
.setRetryOptions(
83+
new RetryOptions.Builder()
84+
.setExpiration(Duration.ofSeconds(100))
85+
.setMaximumInterval(Duration.ofSeconds(1))
86+
.setInitialInterval(Duration.ofSeconds(1))
87+
.setMaximumAttempts(3)
88+
.setDoNotRetry(AssertionError.class)
89+
.build())
90+
.build();
91+
TestActivity activity = Workflow.newActivityStub(TestActivity.class, activityOptions);
92+
activity.runActivity(1);
93+
7694
ChildWorkflowOptions options =
7795
new ChildWorkflowOptions.Builder().setTaskList(taskList).build();
7896
TestChildWorkflow workflow = Workflow.newChildWorkflowStub(TestChildWorkflow.class, options);
@@ -82,6 +100,17 @@ public void execute() {
82100
}
83101
}
84102

103+
public interface TestActivity {
104+
int runActivity(int input);
105+
}
106+
107+
static class TestActivityImpl implements TestActivity {
108+
@Override
109+
public int runActivity(int input) {
110+
return input;
111+
}
112+
}
113+
85114
public interface TestChildWorkflow {
86115

87116
@WorkflowMethod
@@ -181,6 +210,7 @@ public void testWorkflowMetrics() throws InterruptedException {
181210
Worker worker = testEnvironment.newWorker(taskList);
182211
worker.registerWorkflowImplementationTypes(
183212
TestMetricsInWorkflow.class, TestMetricsInChildWorkflow.class);
213+
worker.registerActivitiesImplementations(new TestActivityImpl());
184214
testEnvironment.start();
185215

186216
WorkflowClient workflowClient = testEnvironment.newWorkflowClient();
@@ -216,6 +246,16 @@ public void testWorkflowMetrics() throws InterruptedException {
216246
assertTrue(
217247
sleepDuration.toString(),
218248
sleepDuration.compareTo(com.uber.m3.util.Duration.ofMillis(3100)) < 0);
249+
250+
Map<String, String> activityCompletionTags =
251+
new ImmutableMap.Builder<String, String>(2)
252+
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
253+
.put(MetricsTag.TASK_LIST, taskList)
254+
.put(MetricsTag.ACTIVITY_TYPE, "TestActivity::runActivity")
255+
.build();
256+
verify(reporter, times(1))
257+
.reportCounter("cadence-activity-task-completed", activityCompletionTags, 1);
258+
219259
testEnvironment.close();
220260
}
221261

0 commit comments

Comments
 (0)