|
23 | 23 | import com.uber.cadence.converter.DataConverterException;
|
24 | 24 | import com.uber.cadence.internal.common.CheckedExceptionWrapper;
|
25 | 25 | import com.uber.cadence.internal.common.InternalUtils;
|
| 26 | +import com.uber.cadence.internal.metrics.MetricsType; |
26 | 27 | import com.uber.cadence.internal.replay.ReplayWorkflow;
|
27 | 28 | import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
|
28 | 29 | import com.uber.cadence.internal.worker.WorkflowExecutionException;
|
|
35 | 36 | import com.uber.cadence.workflow.WorkflowInfo;
|
36 | 37 | import com.uber.cadence.workflow.WorkflowInterceptor;
|
37 | 38 | import com.uber.cadence.workflow.WorkflowMethod;
|
| 39 | +import com.uber.m3.tally.Scope; |
38 | 40 | import java.lang.reflect.InvocationTargetException;
|
39 | 41 | import java.lang.reflect.Method;
|
40 | 42 | import java.util.Collections;
|
@@ -64,14 +66,17 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
|
64 | 66 | Collections.synchronizedMap(new HashMap<>());
|
65 | 67 |
|
66 | 68 | private final ExecutorService threadPool;
|
| 69 | + private final Scope metricsScope; |
67 | 70 |
|
68 | 71 | POJOWorkflowImplementationFactory(
|
69 | 72 | DataConverter dataConverter,
|
70 | 73 | ExecutorService threadPool,
|
71 |
| - Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory) { |
| 74 | + Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory, |
| 75 | + Scope metricsScope) { |
72 | 76 | this.dataConverter = Objects.requireNonNull(dataConverter);
|
73 | 77 | this.threadPool = Objects.requireNonNull(threadPool);
|
74 | 78 | this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
|
| 79 | + this.metricsScope = metricsScope; |
75 | 80 | }
|
76 | 81 |
|
77 | 82 | void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {
|
@@ -295,6 +300,7 @@ public void processSignal(String signalName, byte[] input, long eventId) {
|
295 | 300 | + eventId
|
296 | 301 | + ". Dropping it.",
|
297 | 302 | targetException);
|
| 303 | + metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1); |
298 | 304 | } else {
|
299 | 305 | throw new Error(
|
300 | 306 | "Failure processing \"" + signalName + "\" at eventID " + eventId, targetException);
|
|
0 commit comments