Skip to content

Commit e1b86f6

Browse files
authored
Add missing metrics and tags (#395)
1 parent a8bcc17 commit e1b86f6

12 files changed

+76
-40
lines changed

src/main/java/com/uber/cadence/activity/ActivityTask.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,5 @@ public interface ActivityTask {
6464

6565
String getWorkflowDomain();
6666

67-
String getTaskList();
68-
6967
int getAttempt();
7068
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ public WorkflowExecution startWorkflow(StartWorkflowExecutionParameters startPar
7878
} finally {
7979
// TODO: can probably cache this
8080
Map<String, String> tags =
81-
new ImmutableMap.Builder<String, String>(1)
81+
new ImmutableMap.Builder<String, String>(3)
8282
.put(MetricsTag.WORKFLOW_TYPE, startParameters.getWorkflowType().getName())
83+
.put(MetricsTag.TASK_LIST, startParameters.getTaskList())
84+
.put(MetricsTag.DOMAIN, domain)
8385
.build();
8486
metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_START_COUNTER).inc(1);
8587
}
@@ -203,6 +205,23 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
203205
@Override
204206
public WorkflowExecution signalWithStartWorkflowExecution(
205207
SignalWithStartWorkflowExecutionParameters parameters) {
208+
try {
209+
return signalWithStartWorkflowInternal(parameters);
210+
} finally {
211+
Map<String, String> tags =
212+
new ImmutableMap.Builder<String, String>(3)
213+
.put(
214+
MetricsTag.WORKFLOW_TYPE,
215+
parameters.getStartParameters().getWorkflowType().getName())
216+
.put(MetricsTag.TASK_LIST, parameters.getStartParameters().getTaskList())
217+
.put(MetricsTag.DOMAIN, domain)
218+
.build();
219+
metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_SIGNAL_WITH_START_COUNTER).inc(1);
220+
}
221+
}
222+
223+
private WorkflowExecution signalWithStartWorkflowInternal(
224+
SignalWithStartWorkflowExecutionParameters parameters) {
206225
SignalWithStartWorkflowExecutionRequest request = new SignalWithStartWorkflowExecutionRequest();
207226
request.setDomain(domain);
208227
StartWorkflowExecutionParameters startParameters = parameters.getStartParameters();

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientFactoryImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.converter.DataConverter;
22+
import com.uber.cadence.internal.metrics.MetricsTag;
2223
import com.uber.cadence.serviceclient.IWorkflowService;
2324
import com.uber.m3.tally.Scope;
25+
import com.uber.m3.util.ImmutableMap;
26+
import java.util.Map;
2427

2528
public class ManualActivityCompletionClientFactoryImpl
2629
extends ManualActivityCompletionClientFactory {
@@ -35,7 +38,10 @@ public ManualActivityCompletionClientFactoryImpl(
3538
this.service = service;
3639
this.domain = domain;
3740
this.dataConverter = dataConverter;
38-
this.metricsScope = metricsScope;
41+
42+
Map<String, String> tags =
43+
new ImmutableMap.Builder<String, String>(1).put(MetricsTag.DOMAIN, domain).build();
44+
this.metricsScope = metricsScope.tagged(tags);
3945
}
4046

4147
public IWorkflowService getService() {

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class MetricsType {
113113
CADENCE_METRICS_PREFIX + "local-activity-canceled";
114114
public static final String LOCAL_ACTIVITY_FAILED_COUNTER =
115115
CADENCE_METRICS_PREFIX + "local-activity-failed";
116-
public static final String LOCAL_ACTIVITY_PANIC_COUNTER =
116+
public static final String LOCAL_ACTIVITY_ERROR_COUNTER =
117117
CADENCE_METRICS_PREFIX + "local-activity-panic";
118118
public static final String LOCAL_ACTIVITY_EXECUTION_LATENCY =
119119
CADENCE_METRICS_PREFIX + "local-activity-execution-latency";

src/main/java/com/uber/cadence/internal/metrics/ReplayAwareScope.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
import java.util.function.Supplier;
3636

3737
public class ReplayAwareScope implements Scope {
38-
private Scope scope;
39-
private ReplayAware context;
40-
private Supplier<Long> clock;
38+
private final Scope scope;
39+
private final ReplayAware context;
40+
private final Supplier<Long> clock;
4141

4242
public ReplayAwareScope(Scope scope, ReplayAware context, Supplier<Long> clock) {
4343
this.scope = Objects.requireNonNull(scope);

src/main/java/com/uber/cadence/internal/sync/ActivityTaskImpl.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@
2626

2727
final class ActivityTaskImpl implements ActivityTask {
2828
private final PollForActivityTaskResponse response;
29-
private final String taskList;
3029

31-
ActivityTaskImpl(PollForActivityTaskResponse response, String taskList) {
30+
ActivityTaskImpl(PollForActivityTaskResponse response) {
3231
this.response = response;
33-
this.taskList = taskList;
3432
}
3533

3634
@Override
@@ -89,11 +87,6 @@ public String getWorkflowDomain() {
8987
return response.getWorkflowDomain();
9088
}
9189

92-
@Override
93-
public String getTaskList() {
94-
return taskList;
95-
}
96-
9790
@Override
9891
public int getAttempt() {
9992
return response.getAttempt();

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@
2828
import com.uber.cadence.converter.DataConverter;
2929
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
3030
import com.uber.cadence.internal.common.InternalUtils;
31-
import com.uber.cadence.internal.metrics.MetricsTag;
3231
import com.uber.cadence.internal.metrics.MetricsType;
3332
import com.uber.cadence.internal.worker.ActivityTaskHandler;
3433
import com.uber.cadence.serviceclient.IWorkflowService;
3534
import com.uber.cadence.testing.SimulatedTimeoutException;
3635
import com.uber.m3.tally.Scope;
37-
import com.uber.m3.util.ImmutableMap;
3836
import java.lang.reflect.InvocationTargetException;
3937
import java.lang.reflect.Method;
4038
import java.util.Collections;
@@ -112,9 +110,12 @@ private void addActivityImplementation(
112110
}
113111

114112
private ActivityTaskHandler.Result mapToActivityFailure(
115-
String activityType, Throwable failure, Scope metricsScope) {
113+
Throwable failure, Scope metricsScope, boolean isLocalActivity) {
116114

117115
if (failure instanceof ActivityCancelledException) {
116+
if (isLocalActivity) {
117+
metricsScope.counter(MetricsType.LOCAL_ACTIVITY_CANCELED_COUNTER).inc(1);
118+
}
118119
throw new CancellationException(failure.getMessage());
119120
}
120121

@@ -127,16 +128,21 @@ private ActivityTaskHandler.Result mapToActivityFailure(
127128
dataConverter.toData(timeoutException.getDetails()));
128129
}
129130

130-
Map<String, String> activityTypeTag =
131-
new ImmutableMap.Builder<String, String>(1)
132-
.put(MetricsTag.ACTIVITY_TYPE, activityType)
133-
.build();
134131
if (failure instanceof Error) {
135-
metricsScope.tagged(activityTypeTag).counter(MetricsType.ACTIVITY_TASK_ERROR_COUNTER).inc(1);
132+
if (isLocalActivity) {
133+
metricsScope.counter(MetricsType.LOCAL_ACTIVITY_ERROR_COUNTER).inc(1);
134+
} else {
135+
metricsScope.counter(MetricsType.ACTIVITY_TASK_ERROR_COUNTER).inc(1);
136+
}
136137
throw (Error) failure;
137138
}
138139

139-
metricsScope.tagged(activityTypeTag).counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
140+
if (isLocalActivity) {
141+
metricsScope.counter(MetricsType.LOCAL_ACTIVITY_FAILED_COUNTER).inc(1);
142+
} else {
143+
metricsScope.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
144+
}
145+
140146
RespondActivityTaskFailedRequest result = new RespondActivityTaskFailedRequest();
141147
failure = CheckedExceptionWrapper.unwrap(failure);
142148
result.setReason(failure.getClass().getName());
@@ -166,20 +172,20 @@ void setLocalActivitiesImplementation(Object[] activitiesImplementation) {
166172

167173
@Override
168174
public Result handle(
169-
String taskList, PollForActivityTaskResponse pollResponse, Scope metricsScope) {
175+
PollForActivityTaskResponse pollResponse, Scope metricsScope, boolean isLocalActivity) {
170176
String activityType = pollResponse.getActivityType().getName();
171-
ActivityTaskImpl activityTask = new ActivityTaskImpl(pollResponse, taskList);
177+
ActivityTaskImpl activityTask = new ActivityTaskImpl(pollResponse);
172178
ActivityTaskExecutor activity = activities.get(activityType);
173179
if (activity == null) {
174180
String knownTypes = Joiner.on(", ").join(activities.keySet());
175181
return mapToActivityFailure(
176-
activityType,
177182
new IllegalArgumentException(
178183
"Activity Type \""
179184
+ activityType
180185
+ "\" is not registered with a worker. Known types are: "
181186
+ knownTypes),
182-
metricsScope);
187+
metricsScope,
188+
isLocalActivity);
183189
}
184190
return activity.execute(activityTask, metricsScope);
185191
}
@@ -215,9 +221,9 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
215221
}
216222
return new ActivityTaskHandler.Result(request, null, null, null);
217223
} catch (RuntimeException | IllegalAccessException e) {
218-
return mapToActivityFailure(task.getActivityType(), e, metricsScope);
224+
return mapToActivityFailure(e, metricsScope, false);
219225
} catch (InvocationTargetException e) {
220-
return mapToActivityFailure(task.getActivityType(), e.getTargetException(), metricsScope);
226+
return mapToActivityFailure(e.getTargetException(), metricsScope, false);
221227
} finally {
222228
CurrentActivityExecutionContext.unset();
223229
}
@@ -248,9 +254,9 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
248254
}
249255
return new ActivityTaskHandler.Result(request, null, null, null);
250256
} catch (RuntimeException | IllegalAccessException e) {
251-
return mapToActivityFailure(task.getActivityType(), e, metricsScope);
257+
return mapToActivityFailure(e, metricsScope, true);
252258
} catch (InvocationTargetException e) {
253-
return mapToActivityFailure(task.getActivityType(), e.getTargetException(), metricsScope);
259+
return mapToActivityFailure(e.getTargetException(), metricsScope, true);
254260
} finally {
255261
CurrentActivityExecutionContext.unset();
256262
}

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,7 @@ public <T> Promise<T> executeActivity(
221221
.setWorkflowId("test-workflow-id")
222222
.setRunId(UUID.randomUUID().toString()));
223223
task.setActivityType(new ActivityType().setName(activityType));
224-
Result taskResult =
225-
activityTaskHandler.handle(options.getTaskList(), task, NoopScope.getInstance());
224+
Result taskResult = activityTaskHandler.handle(task, NoopScope.getInstance(), false);
226225
return Workflow.newPromise(getReply(task, taskResult, resultClass, resultType));
227226
}
228227

src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public Duration getBackoff() {
117117
* @param activityTask activity task which is response to PollForActivityTask call.
118118
* @return One of the possible decision task replies.
119119
*/
120-
Result handle(String taskList, PollForActivityTaskResponse activityTask, Scope metricsScope);
120+
Result handle(
121+
PollForActivityTaskResponse activityTask, Scope metricsScope, boolean isLocalActivity);
121122

122123
/** True if this handler handles at least one activity type. */
123124
boolean isAnyTypeSupported();

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void handle(MeasurableActivityTask task) throws Exception {
172172

173173
try {
174174
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
175-
ActivityTaskHandler.Result response = handler.handle(taskList, task.task, metricsScope);
175+
ActivityTaskHandler.Result response = handler.handle(task.task, metricsScope, false);
176176
sw.stop();
177177

178178
sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();

0 commit comments

Comments
 (0)