Skip to content

Commit a24d9b7

Browse files
committed
add unit for reporting metrics for corrupted signal
1 parent fa4e359 commit a24d9b7

11 files changed

+405
-18
lines changed

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -284,23 +284,18 @@ public void processSignal(String signalName, byte[] input, long eventId) {
284284
+ signalHandlers.keySet());
285285
return;
286286
}
287-
Object[] args = dataConverter.fromDataArray(input, signalMethod.getParameterTypes());
288287
try {
288+
Object[] args = dataConverter.fromDataArray(input, signalMethod.getParameterTypes());
289289
newInstance();
290290
signalMethod.invoke(workflow, args);
291291
} catch (IllegalAccessException e) {
292292
throw new Error("Failure processing \"" + signalName + "\" at eventID " + eventId, e);
293+
} catch (DataConverterException e){
294+
logSerializationException(signalName, eventId, e);
293295
} catch (InvocationTargetException e) {
294296
Throwable targetException = e.getTargetException();
295297
if (targetException instanceof DataConverterException) {
296-
log.error(
297-
"Failure deserializing signal input for \""
298-
+ signalName
299-
+ "\" at eventID "
300-
+ eventId
301-
+ ". Dropping it.",
302-
targetException);
303-
metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
298+
logSerializationException(signalName, eventId, (DataConverterException)targetException);
304299
} else {
305300
throw new Error(
306301
"Failure processing \"" + signalName + "\" at eventID " + eventId, targetException);
@@ -309,6 +304,17 @@ public void processSignal(String signalName, byte[] input, long eventId) {
309304
}
310305
}
311306

307+
void logSerializationException(String signalName, Long eventId, DataConverterException exception){
308+
log.error(
309+
"Failure deserializing signal input for \""
310+
+ signalName
311+
+ "\" at eventID "
312+
+ eventId
313+
+ ". Dropping it.",
314+
exception);
315+
metricsScope.counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
316+
}
317+
312318
static WorkflowExecutionException mapToWorkflowExecutionException(
313319
Exception failure, DataConverter dataConverter) {
314320
failure = CheckedExceptionWrapper.unwrap(failure);

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import java.util.concurrent.ExecutionException;
8686
import java.util.concurrent.TimeUnit;
8787
import java.util.concurrent.TimeoutException;
88+
import java.util.function.Function;
8889
import org.apache.thrift.TException;
8990
import org.apache.thrift.async.AsyncMethodCallback;
9091
import org.slf4j.Logger;
@@ -110,16 +111,22 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
110111

111112
@Override
112113
public Worker newWorker(String taskList) {
114+
return newWorker(taskList, x->x);
115+
}
116+
117+
@Override
118+
public Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions) {
113119
WorkerOptions.Builder builder =
114-
new WorkerOptions.Builder()
115-
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
116-
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
117-
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
120+
new WorkerOptions.Builder()
121+
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
122+
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
123+
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay());
118124
if (testEnvironmentOptions.getDataConverter() != null) {
119125
builder.setDataConverter(testEnvironmentOptions.getDataConverter());
120126
}
127+
builder = overrideOptions.apply(builder);
121128
Worker result =
122-
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
129+
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
123130
workers.add(result);
124131
return result;
125132
}

src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal;
2424
import com.uber.cadence.serviceclient.IWorkflowService;
2525
import com.uber.cadence.worker.Worker;
26+
import com.uber.cadence.worker.WorkerOptions;
2627
import java.time.Duration;
28+
import java.util.function.Function;
2729

2830
/**
2931
* TestWorkflowEnvironment provides workflow unit testing capabilities.
@@ -104,6 +106,15 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
104106
*/
105107
Worker newWorker(String taskList);
106108

109+
/**
110+
* Creates a new Worker instance that is connected to the in-memory test Cadence service. {@link
111+
* #close()} calls {@link Worker#shutdown(Duration)} for all workers created through this method.
112+
*
113+
* @param taskList task list to poll.
114+
* @param overrideOptions is used to override the default worker options.
115+
*/
116+
Worker newWorker(String taskList, Function<WorkerOptions.Builder,WorkerOptions.Builder> overrideOptions);
117+
107118
/** Creates a WorkflowClient that is connected to the in-memory test Cadence service. */
108119
WorkflowClient newWorkflowClient();
109120

src/test/java/com/uber/cadence/workflow/MetricsTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020
import static org.junit.Assert.assertTrue;
2121
import static org.mockito.Matchers.any;
2222
import static org.mockito.Matchers.eq;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.*;
2624

2725
import com.uber.cadence.client.WorkflowClient;
2826
import com.uber.cadence.client.WorkflowOptions;
2927
import com.uber.cadence.internal.metrics.MetricsTag;
28+
import com.uber.cadence.internal.metrics.MetricsType;
3029
import com.uber.cadence.testing.TestEnvironmentOptions;
3130
import com.uber.cadence.testing.TestEnvironmentOptions.Builder;
3231
import com.uber.cadence.testing.TestWorkflowEnvironment;
3332
import com.uber.cadence.worker.Worker;
33+
import com.uber.cadence.workflow.interceptors.CorruptedSignalWorkflowInterceptorFactory;
34+
import com.uber.cadence.workflow.samples.ReceiveSignalObject_ChildWorkflowImpl;
35+
import com.uber.cadence.workflow.samples.SendSignalObject_Workflow;
36+
import com.uber.cadence.workflow.samples.SendSignalObject_WorkflowImpl;
3437
import com.uber.m3.tally.RootScopeBuilder;
3538
import com.uber.m3.tally.Scope;
3639
import com.uber.m3.tally.StatsReporter;
@@ -109,7 +112,7 @@ public void setUp() {
109112
Scope scope =
110113
new RootScopeBuilder()
111114
.reporter(reporter)
112-
.reportEvery(com.uber.m3.util.Duration.ofMillis(10));
115+
.reportEvery(com.uber.m3.util.Duration.ofMillis(300));
113116

114117
TestEnvironmentOptions testOptions =
115118
new Builder().setDomain(WorkflowTest.DOMAIN).setMetricsScope(scope).build();
@@ -158,4 +161,34 @@ public void testWorkflowMetrics() throws InterruptedException {
158161
sleepDuration.toString(),
159162
sleepDuration.compareTo(com.uber.m3.util.Duration.ofMillis(3100)) < 0);
160163
}
164+
165+
@Test
166+
public void testCorruptedSignalMetrics() throws InterruptedException {
167+
Worker worker = testEnvironment.newWorker(taskList, builder ->
168+
builder.setInterceptorFactory(new CorruptedSignalWorkflowInterceptorFactory()));
169+
170+
worker.registerWorkflowImplementationTypes(
171+
SendSignalObject_WorkflowImpl.class, ReceiveSignalObject_ChildWorkflowImpl.class);
172+
worker.start();
173+
174+
WorkflowOptions options =
175+
new WorkflowOptions.Builder()
176+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(1000))
177+
.setTaskList(taskList)
178+
.build();
179+
180+
WorkflowClient workflowClient = testEnvironment.newWorkflowClient();
181+
SendSignalObject_Workflow workflow = workflowClient.newWorkflowStub(SendSignalObject_Workflow.class, options);
182+
workflow.execute();
183+
184+
//Wait for reporter
185+
Thread.sleep(500);
186+
187+
Map<String, String> tags =
188+
new ImmutableMap.Builder<String, String>(2)
189+
.put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN)
190+
.put(MetricsTag.TASK_LIST, taskList)
191+
.build();
192+
verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 2);
193+
}
161194
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow.interceptors;
19+
20+
import com.uber.cadence.workflow.WorkflowInterceptor;
21+
22+
import java.util.function.Function;
23+
24+
public class CorruptedSignalWorkflowInterceptorFactory
25+
implements Function<WorkflowInterceptor, WorkflowInterceptor> {
26+
27+
@Override
28+
public WorkflowInterceptor apply(WorkflowInterceptor next) {
29+
return new SignalWorkflowInterceptor( args -> {
30+
if(args != null && args.length > 0){
31+
return new Object [] {"Corrupted Signal"};
32+
}
33+
return args;
34+
}, sig->sig, next);
35+
}
36+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow.interceptors;
19+
20+
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.activity.ActivityOptions;
22+
import com.uber.cadence.workflow.*;
23+
import java.time.Duration;
24+
import java.util.Objects;
25+
import java.util.Optional;
26+
import java.util.Random;
27+
import java.util.UUID;
28+
import java.util.function.BiPredicate;
29+
import java.util.function.Function;
30+
import java.util.function.Supplier;
31+
32+
public class SignalWorkflowInterceptor implements WorkflowInterceptor {
33+
34+
private Function<Object[], Object[]> overrideArgs;
35+
private Function<String, String> overrideSignalName;
36+
private final WorkflowInterceptor next;
37+
38+
public SignalWorkflowInterceptor(Function<Object[],Object[]> overrideArgs,
39+
Function<String,String> overrideSignalName,
40+
WorkflowInterceptor next) {
41+
this.overrideArgs = overrideArgs;
42+
this.overrideSignalName = overrideSignalName;
43+
this.next = Objects.requireNonNull(next);
44+
}
45+
46+
@Override
47+
public <R> Promise<R> executeActivity(
48+
String activityName, Class<R> returnType, Object[] args, ActivityOptions options) {
49+
return next.executeActivity(activityName, returnType, args, options);
50+
}
51+
52+
@Override
53+
public <R> WorkflowResult<R> executeChildWorkflow(
54+
String workflowType, Class<R> returnType, Object[] args, ChildWorkflowOptions options) {
55+
return next.executeChildWorkflow(workflowType, returnType, args, options);
56+
}
57+
58+
@Override
59+
public Random newRandom() {
60+
return next.newRandom();
61+
}
62+
63+
@Override
64+
public Promise<Void> signalExternalWorkflow(
65+
WorkflowExecution execution, String signalName, Object[] args) {
66+
if(args != null && args.length > 0){
67+
args = new Object[] {"corrupted signal"};
68+
}
69+
return next.signalExternalWorkflow(execution, overrideSignalName.apply(signalName), overrideArgs.apply(args));
70+
}
71+
72+
@Override
73+
public Promise<Void> cancelWorkflow(WorkflowExecution execution) {
74+
return next.cancelWorkflow(execution);
75+
}
76+
77+
@Override
78+
public void sleep(Duration duration) {
79+
next.sleep(duration);
80+
}
81+
82+
@Override
83+
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
84+
return next.await(timeout, reason, unblockCondition);
85+
}
86+
87+
@Override
88+
public void await(String reason, Supplier<Boolean> unblockCondition) {
89+
next.await(reason, unblockCondition);
90+
}
91+
92+
@Override
93+
public Promise<Void> newTimer(Duration duration) {
94+
return next.newTimer(duration);
95+
}
96+
97+
@Override
98+
public <R> R sideEffect(Class<R> resultType, Functions.Func<R> func) {
99+
return next.sideEffect(resultType, func);
100+
}
101+
102+
@Override
103+
public <R> R mutableSideEffect(
104+
String id, Class<R> returnType, BiPredicate<R, R> updated, Functions.Func<R> func) {
105+
return next.mutableSideEffect(id, returnType, updated, func);
106+
}
107+
108+
@Override
109+
public int getVersion(String changeID, int minSupported, int maxSupported) {
110+
return next.getVersion(changeID, minSupported, maxSupported);
111+
}
112+
113+
@Override
114+
public void continueAsNew(
115+
Optional<String> workflowType, Optional<ContinueAsNewOptions> options, Object[] args) {
116+
next.continueAsNew(workflowType, options, args);
117+
}
118+
119+
@Override
120+
public void registerQuery(
121+
String queryType, Class<?>[] argTypes, Functions.Func1<Object[], Object> callback) {
122+
next.registerQuery(queryType, argTypes, callback);
123+
}
124+
125+
@Override
126+
public UUID randomUUID() {
127+
return next.randomUUID();
128+
}
129+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow.samples;
19+
20+
import com.uber.cadence.workflow.SignalMethod;
21+
import com.uber.cadence.workflow.WorkflowMethod;
22+
import com.uber.cadence.workflow.samples.Signal;
23+
24+
public interface ReceiveSignalObject_ChildWorkflow {
25+
26+
@WorkflowMethod
27+
String execute();
28+
29+
@SignalMethod(name = "testSignal")
30+
void signal(Signal arg);
31+
32+
@SignalMethod(name = "endWorkflow")
33+
void close();
34+
}

0 commit comments

Comments
 (0)