Skip to content

Commit 244d47d

Browse files
authored
Add activity type and workflow type tags for activity e2e metrics (#458)
* Add activity type and workflow type tags for activity e2e metrics * fix test service * Fix metrics test
1 parent 384ef72 commit 244d47d

File tree

5 files changed

+43
-37
lines changed

5 files changed

+43
-37
lines changed

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public <T> Promise<T> executeActivity(
157157
new WorkflowExecution()
158158
.setWorkflowId("test-workflow-id")
159159
.setRunId(UUID.randomUUID().toString()));
160+
task.setWorkflowType(new WorkflowType().setName("test-workflow"));
160161
task.setActivityType(new ActivityType().setName(activityType));
161162
Result taskResult = activityTaskHandler.handle(task, NoopScope.getInstance(), false);
162163
return Workflow.newPromise(getReply(task, taskResult, resultClass, resultType));

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ private static void scheduleActivityTask(
727727

728728
PollForActivityTaskResponse taskResponse =
729729
new PollForActivityTaskResponse()
730+
.setWorkflowType(data.startWorkflowExecutionRequest.workflowType)
730731
.setActivityType(d.getActivityType())
731732
.setWorkflowExecution(ctx.getExecution())
732733
.setActivityId(d.getActivityId())

src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
final class ActivityPollTask implements Poller.PollTask<ActivityWorker.MeasurableActivityTask> {
34+
final class ActivityPollTask implements Poller.PollTask<PollForActivityTaskResponse> {
3535

3636
private final IWorkflowService service;
3737
private final String domain;
@@ -49,10 +49,9 @@ public ActivityPollTask(
4949
}
5050

5151
@Override
52-
public ActivityWorker.MeasurableActivityTask poll() throws TException {
52+
public PollForActivityTaskResponse poll() throws TException {
5353
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1);
5454
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_POLL_LATENCY).start();
55-
Stopwatch e2eSW = options.getMetricsScope().timer(MetricsType.ACTIVITY_E2E_LATENCY).start();
5655

5756
PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest();
5857
pollRequest.setDomain(domain);
@@ -99,6 +98,6 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException {
9998
Duration.ofNanos(
10099
result.getStartedTimestamp() - result.getScheduledTimestampOfThisAttempt()));
101100
sw.stop();
102-
return new ActivityWorker.MeasurableActivityTask(result, e2eSW);
101+
return result;
103102
}
104103
}

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.*;
20+
import com.uber.cadence.BadRequestError;
21+
import com.uber.cadence.DomainNotActiveError;
22+
import com.uber.cadence.EntityNotExistsError;
23+
import com.uber.cadence.Header;
24+
import com.uber.cadence.PollForActivityTaskResponse;
25+
import com.uber.cadence.RespondActivityTaskCanceledRequest;
26+
import com.uber.cadence.RespondActivityTaskCompletedRequest;
27+
import com.uber.cadence.RespondActivityTaskFailedRequest;
28+
import com.uber.cadence.WorkflowExecution;
2129
import com.uber.cadence.common.RetryOptions;
2230
import com.uber.cadence.context.ContextPropagator;
2331
import com.uber.cadence.internal.common.Retryer;
@@ -132,21 +140,8 @@ public boolean isSuspended() {
132140
return poller.isSuspended();
133141
}
134142

135-
static class MeasurableActivityTask {
136-
PollForActivityTaskResponse task;
137-
Stopwatch sw;
138-
139-
MeasurableActivityTask(PollForActivityTaskResponse task, Stopwatch sw) {
140-
this.task = Objects.requireNonNull(task);
141-
this.sw = Objects.requireNonNull(sw);
142-
}
143-
144-
void markDone() {
145-
sw.stop();
146-
}
147-
}
148-
149-
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<MeasurableActivityTask> {
143+
private class TaskHandlerImpl
144+
implements PollTaskExecutor.TaskHandler<PollForActivityTaskResponse> {
150145

151146
final ActivityTaskHandler handler;
152147

@@ -155,43 +150,52 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
155150
}
156151

157152
@Override
158-
public void handle(MeasurableActivityTask task) throws Exception {
153+
public void handle(PollForActivityTaskResponse task) throws Exception {
159154
Scope metricsScope =
160155
options
161156
.getMetricsScope()
162157
.tagged(
163-
ImmutableMap.of(MetricsTag.ACTIVITY_TYPE, task.task.getActivityType().getName()));
158+
ImmutableMap.of(
159+
MetricsTag.ACTIVITY_TYPE,
160+
task.getActivityType().getName(),
161+
MetricsTag.WORKFLOW_TYPE,
162+
task.getWorkflowType().getName()));
163+
164164
metricsScope
165-
.timer(MetricsType.TASK_LIST_QUEUE_LATENCY)
165+
.timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY)
166166
.record(
167167
Duration.ofNanos(
168-
task.task.getStartedTimestamp() - task.task.getScheduledTimestamp()));
168+
task.getStartedTimestamp() - task.getScheduledTimestampOfThisAttempt()));
169169

170170
// The following tags are for logging.
171-
MDC.put(LoggerTag.ACTIVITY_ID, task.task.getActivityId());
172-
MDC.put(LoggerTag.ACTIVITY_TYPE, task.task.getActivityType().getName());
173-
MDC.put(LoggerTag.WORKFLOW_ID, task.task.getWorkflowExecution().getWorkflowId());
174-
MDC.put(LoggerTag.RUN_ID, task.task.getWorkflowExecution().getRunId());
171+
MDC.put(LoggerTag.ACTIVITY_ID, task.getActivityId());
172+
MDC.put(LoggerTag.ACTIVITY_TYPE, task.getActivityType().getName());
173+
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
174+
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
175175

176-
propagateContext(task.task);
176+
propagateContext(task);
177177

178178
try {
179179
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
180-
ActivityTaskHandler.Result response = handler.handle(task.task, metricsScope, false);
180+
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
181181
sw.stop();
182182

183183
sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
184-
sendReply(task.task, response, metricsScope);
184+
sendReply(task, response, metricsScope);
185185
sw.stop();
186186

187-
task.markDone();
187+
metricsScope
188+
.timer(MetricsType.ACTIVITY_E2E_LATENCY)
189+
.record(
190+
Duration.ofNanos(System.nanoTime() - task.getScheduledTimestampOfThisAttempt()));
191+
188192
} catch (CancellationException e) {
189193
RespondActivityTaskCanceledRequest cancelledRequest =
190194
new RespondActivityTaskCanceledRequest();
191195
cancelledRequest.setDetails(
192196
String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8));
193197
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
194-
sendReply(task.task, new Result(null, null, cancelledRequest, null), metricsScope);
198+
sendReply(task, new Result(null, null, cancelledRequest, null), metricsScope);
195199
sw.stop();
196200
} finally {
197201
MDC.remove(LoggerTag.ACTIVITY_ID);
@@ -225,17 +229,17 @@ void propagateContext(PollForActivityTaskResponse response) {
225229
}
226230

227231
@Override
228-
public Throwable wrapFailure(MeasurableActivityTask task, Throwable failure) {
229-
WorkflowExecution execution = task.task.getWorkflowExecution();
232+
public Throwable wrapFailure(PollForActivityTaskResponse task, Throwable failure) {
233+
WorkflowExecution execution = task.getWorkflowExecution();
230234
return new RuntimeException(
231235
"Failure processing activity task. WorkflowID="
232236
+ execution.getWorkflowId()
233237
+ ", RunID="
234238
+ execution.getRunId()
235239
+ ", ActivityType="
236-
+ task.task.getActivityType().getName()
240+
+ task.getActivityType().getName()
237241
+ ", ActivityID="
238-
+ task.task.getActivityId(),
242+
+ task.getActivityId(),
239243
failure);
240244
}
241245

src/test/java/com/uber/cadence/workflow/MetricsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ public void testWorkflowMetrics() throws InterruptedException {
252252
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
253253
.put(MetricsTag.TASK_LIST, taskList)
254254
.put(MetricsTag.ACTIVITY_TYPE, "TestActivity::runActivity")
255+
.put(MetricsTag.WORKFLOW_TYPE, "TestWorkflow::execute")
255256
.build();
256257
verify(reporter, times(1))
257258
.reportCounter("cadence-activity-task-completed", activityCompletionTags, 1);

0 commit comments

Comments
 (0)