Skip to content

Commit 7d406dc

Browse files
authored
Merge pull request #180 from halakaraki/add-metrics-corrupted-signals
add metric for corrupted signal
2 parents 0ee4f65 + 0307723 commit 7d406dc

File tree

7 files changed

+294
-25
lines changed

7 files changed

+294
-25
lines changed

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ public class MetricsType {
121121
public static final String UNHANDLED_SIGNALS_COUNTER =
122122
CADENCE_METRICS_PREFIX + "unhandled-signals";
123123

124+
public static final String CORRUPTED_SIGNALS_COUNTER =
125+
CADENCE_METRICS_PREFIX + "corrupted-signals";
126+
124127
public static final String WORKER_START_COUNTER = CADENCE_METRICS_PREFIX + "worker-start";
125128
public static final String POLLER_START_COUNTER = CADENCE_METRICS_PREFIX + "poller-start";
126129

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.converter.DataConverterException;
2424
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2525
import com.uber.cadence.internal.common.InternalUtils;
26+
import com.uber.cadence.internal.metrics.MetricsType;
2627
import com.uber.cadence.internal.replay.ReplayWorkflow;
2728
import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
2829
import com.uber.cadence.internal.worker.WorkflowExecutionException;
@@ -35,6 +36,7 @@
3536
import com.uber.cadence.workflow.WorkflowInfo;
3637
import com.uber.cadence.workflow.WorkflowInterceptor;
3738
import com.uber.cadence.workflow.WorkflowMethod;
39+
import com.uber.m3.tally.Scope;
3840
import java.lang.reflect.InvocationTargetException;
3941
import java.lang.reflect.Method;
4042
import java.util.Collections;
@@ -64,14 +66,17 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
6466
Collections.synchronizedMap(new HashMap<>());
6567

6668
private final ExecutorService threadPool;
69+
private final Scope metricsScope;
6770

6871
POJOWorkflowImplementationFactory(
6972
DataConverter dataConverter,
7073
ExecutorService threadPool,
71-
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory) {
74+
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
75+
Scope metricsScope) {
7276
this.dataConverter = Objects.requireNonNull(dataConverter);
7377
this.threadPool = Objects.requireNonNull(threadPool);
7478
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
79+
this.metricsScope = metricsScope;
7580
}
7681

7782
void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {
@@ -279,22 +284,19 @@ public void processSignal(String signalName, byte[] input, long eventId) {
279284
+ signalHandlers.keySet());
280285
return;
281286
}
282-
Object[] args = dataConverter.fromDataArray(input, signalMethod.getGenericParameterTypes());
287+
283288
try {
289+
Object[] args = dataConverter.fromDataArray(input, signalMethod.getGenericParameterTypes());
284290
newInstance();
285291
signalMethod.invoke(workflow, args);
286292
} catch (IllegalAccessException e) {
287293
throw new Error("Failure processing \"" + signalName + "\" at eventID " + eventId, e);
294+
} catch (DataConverterException e){
295+
logSerializationException(signalName, eventId, e);
288296
} catch (InvocationTargetException e) {
289297
Throwable targetException = e.getTargetException();
290298
if (targetException instanceof DataConverterException) {
291-
log.error(
292-
"Failure deserializing signal input for \""
293-
+ signalName
294-
+ "\" at eventID "
295-
+ eventId
296-
+ ". Dropping it.",
297-
targetException);
299+
logSerializationException(signalName, eventId, (DataConverterException)targetException);
298300
} else {
299301
throw new Error(
300302
"Failure processing \"" + signalName + "\" at eventID " + eventId, targetException);
@@ -303,6 +305,17 @@ public void processSignal(String signalName, byte[] input, long eventId) {
303305
}
304306
}
305307

308+
void logSerializationException(String signalName, Long eventId, DataConverterException exception){
309+
log.error(
310+
"Failure deserializing signal input for \""
311+
+ signalName
312+
+ "\" at eventID "
313+
+ eventId
314+
+ ". Dropping it.",
315+
exception);
316+
metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
317+
}
318+
306319
static WorkflowExecutionException mapToWorkflowExecutionException(
307320
Exception failure, DataConverter dataConverter) {
308321
failure = CheckedExceptionWrapper.unwrap(failure);

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public SyncWorkflowWorker(
5555
new SynchronousQueue<>());
5656
factory =
5757
new POJOWorkflowImplementationFactory(
58-
options.getDataConverter(), workflowThreadPool, interceptorFactory);
58+
options.getDataConverter(),
59+
workflowThreadPool,
60+
interceptorFactory,
61+
options.getMetricsScope());
5962
DecisionTaskHandler taskHandler = new ReplayDecisionTaskHandler(domain, factory, options);
6063
worker = new WorkflowWorker(service, domain, taskList, options, taskHandler);
6164
this.options = options;

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import java.util.concurrent.ExecutionException;
8787
import java.util.concurrent.TimeUnit;
8888
import java.util.concurrent.TimeoutException;
89+
import java.util.function.Function;
8990
import org.apache.thrift.TException;
9091
import org.apache.thrift.async.AsyncMethodCallback;
9192
import org.slf4j.Logger;
@@ -111,16 +112,22 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
111112

112113
@Override
113114
public Worker newWorker(String taskList) {
115+
return newWorker(taskList, x->x);
116+
}
117+
118+
@Override
119+
public Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions) {
114120
WorkerOptions.Builder builder =
115-
new WorkerOptions.Builder()
116-
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
117-
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
118-
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
121+
new WorkerOptions.Builder()
122+
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
123+
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
124+
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
119125
if (testEnvironmentOptions.getDataConverter() != null) {
120126
builder.setDataConverter(testEnvironmentOptions.getDataConverter());
121127
}
128+
builder = overrideOptions.apply(builder);
122129
Worker result =
123-
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
130+
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
124131
workers.add(result);
125132
return result;
126133
}

src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal;
2424
import com.uber.cadence.serviceclient.IWorkflowService;
2525
import com.uber.cadence.worker.Worker;
26+
import com.uber.cadence.worker.WorkerOptions;
2627
import java.time.Duration;
28+
import java.util.function.Function;
2729

2830
/**
2931
* TestWorkflowEnvironment provides workflow unit testing capabilities.
@@ -104,6 +106,15 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
104106
*/
105107
Worker newWorker(String taskList);
106108

109+
/**
110+
* Creates a new Worker instance that is connected to the in-memory test Cadence service. {@link
111+
* #close()} calls {@link Worker#shutdown(Duration)} for all workers created through this method.
112+
*
113+
* @param taskList task list to poll.
114+
* @param overrideOptions is used to override the default worker options.
115+
*/
116+
Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions);
117+
107118
/** Creates a WorkflowClient that is connected to the in-memory test Cadence service. */
108119
WorkflowClient newWorkflowClient();
109120

src/test/java/com/uber/cadence/workflow/MetricsTest.java

Lines changed: 115 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,26 @@
2020
import static org.junit.Assert.assertTrue;
2121
import static org.mockito.Matchers.any;
2222
import static org.mockito.Matchers.eq;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.*;
2624

2725
import com.uber.cadence.client.WorkflowClient;
2826
import com.uber.cadence.client.WorkflowOptions;
2927
import com.uber.cadence.internal.metrics.MetricsTag;
28+
import com.uber.cadence.internal.metrics.MetricsType;
3029
import com.uber.cadence.testing.TestEnvironmentOptions;
3130
import com.uber.cadence.testing.TestEnvironmentOptions.Builder;
3231
import com.uber.cadence.testing.TestWorkflowEnvironment;
3332
import com.uber.cadence.worker.Worker;
33+
import com.uber.cadence.workflow.interceptors.SignalWorkflowInterceptor;
3434
import com.uber.m3.tally.RootScopeBuilder;
3535
import com.uber.m3.tally.Scope;
3636
import com.uber.m3.tally.StatsReporter;
3737
import com.uber.m3.tally.Stopwatch;
3838
import com.uber.m3.util.ImmutableMap;
3939
import java.time.Duration;
4040
import java.util.Map;
41-
import org.junit.Before;
41+
import java.util.function.Function;
42+
4243
import org.junit.Rule;
4344
import org.junit.Test;
4445
import org.junit.rules.TestWatcher;
@@ -103,21 +104,79 @@ public void executeChild() {
103104
}
104105
}
105106

106-
@Before
107-
public void setUp() {
107+
public interface ReceiveSignalObjectChildWorkflow {
108+
109+
@WorkflowMethod
110+
String execute();
111+
112+
@SignalMethod(name = "testSignal")
113+
void signal(Signal arg);
114+
115+
@SignalMethod(name = "endWorkflow")
116+
void close();
117+
}
118+
119+
public static class ReceiveSignalObjectChildWorkflowImpl implements ReceiveSignalObjectChildWorkflow {
120+
private String receivedSignal = "Initial State";
121+
// Keep workflow open so that we can send signal
122+
CompletablePromise<Void> promise = Workflow.newPromise();
123+
@Override
124+
public String execute() {
125+
promise.get();
126+
return receivedSignal;
127+
}
128+
129+
@Override
130+
public void signal(Signal arg) {
131+
receivedSignal = arg.value;
132+
}
133+
134+
@Override
135+
public void close() {
136+
promise.complete(null);
137+
}
138+
}
139+
140+
public interface SendSignalObjectWorkflow {
141+
142+
@WorkflowMethod
143+
String execute();
144+
}
145+
146+
public static class SendSignalObjectWorkflowImpl implements SendSignalObjectWorkflow {
147+
@Override
148+
public String execute() {
149+
ReceiveSignalObjectChildWorkflow child =
150+
Workflow.newChildWorkflowStub(ReceiveSignalObjectChildWorkflow.class);
151+
Promise<String> greeting = Async.function(child::execute);
152+
Signal sig = new Signal();
153+
sig.value = "Hello World";
154+
child.signal(sig);
155+
child.close();
156+
return greeting.get();
157+
}
158+
}
159+
160+
public static class Signal {
161+
162+
public String value;
163+
}
164+
165+
public void setUp(com.uber.m3.util.Duration reportingFrequecy){
108166
reporter = mock(StatsReporter.class);
109167
Scope scope =
110-
new RootScopeBuilder()
111-
.reporter(reporter)
112-
.reportEvery(com.uber.m3.util.Duration.ofMillis(10));
168+
new RootScopeBuilder()
169+
.reporter(reporter)
170+
.reportEvery(reportingFrequecy);
113171

114172
TestEnvironmentOptions testOptions =
115-
new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build();
173+
new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build();
116174
testEnvironment = TestWorkflowEnvironment.newInstance(testOptions);
117175
}
118176

119177
@Test
120178
public void testWorkflowMetrics() throws InterruptedException {
179+
setUp(com.uber.m3.util.Duration.ofMillis(10));
121180

122181
Worker worker = testEnvironment.newWorker(taskList);
123182
worker.registerWorkflowImplementationTypes(
@@ -158,4 +217,50 @@ public void testWorkflowMetrics() throws InterruptedException {
158217
sleepDuration.toString(),
159218
sleepDuration.compareTo(com.uber.m3.util.Duration.ofMillis(3100)) < 0);
160219
}
220+
221+
@Test
222+
public void testCorruptedSignalMetrics() throws InterruptedException {
223+
setUp(com.uber.m3.util.Duration.ofMillis(300));
224+
225+
Worker worker = testEnvironment.newWorker(taskList, builder ->
226+
builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory()));
227+
228+
worker.registerWorkflowImplementationTypes(
229+
SendSignalObjectWorkflowImpl.class, ReceiveSignalObjectChildWorkflowImpl.class);
230+
worker.start();
231+
232+
WorkflowOptions options =
233+
new WorkflowOptions.Builder()
234+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(1000))
235+
.setTaskList(taskList)
236+
.build();
237+
238+
WorkflowClient workflowClient = testEnvironment.newWorkflowClient();
239+
SendSignalObjectWorkflow workflow = workflowClient.newWorkflowStub(SendSignalObjectWorkflow.class, options);
240+
workflow.execute();
241+
242+
//Wait for reporter
243+
Thread.sleep(600);
244+
245+
Map<String, String> tags =
246+
new ImmutableMap.Builder<String, String>(2)
247+
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
248+
.put(MetricsTag.TASK_LIST, taskList)
249+
.build();
250+
verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 2);
251+
}
252+
253+
private static class CorruptedSignalWorkflowInterceptorFactory
254+
implements Function<WorkflowInterceptor, WorkflowInterceptor> {
255+
256+
@Override
257+
public WorkflowInterceptor apply(WorkflowInterceptor next) {
258+
return new SignalWorkflowInterceptor(args -> {
259+
if(args != null && args.length > 0){
260+
return new Object [] {"Corrupted Signal"};
261+
}
262+
return args;
263+
}, sig->sig, next);
264+
}
265+
}
161266
}

0 commit comments

Comments
 (0)