Skip to content

Commit cee33b3

Browse files
authored
Added TestWorkflowService that implements Cadence service API. (#126)
* Minor comment fixes * Added SelfAdvancingTimer * All WorkflowTest tests are passing with both docker and in process service
1 parent 54a2c50 commit cee33b3

26 files changed

+907
-249
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
compile(group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3')
4444
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
4545
testCompile group: 'junit', name: 'junit', version: '4.12'
46+
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
4647
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
4748
}
4849

src/main/java/com/uber/cadence/client/WorkflowException.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,25 @@ public class WorkflowException extends RuntimeException {
3131

3232
protected WorkflowException(
3333
String message, WorkflowExecution execution, String workflowType, Throwable cause) {
34-
super(message, cause);
34+
super(getMessage(message, execution, workflowType), cause);
3535
this.execution = execution;
3636
this.workflowType = workflowType;
3737
}
3838

39+
private static String getMessage(
40+
String message, WorkflowExecution execution, String workflowType) {
41+
StringBuilder result = new StringBuilder();
42+
result.append(message);
43+
if (workflowType != null) {
44+
result.append(", WorkflowType=\"");
45+
result.append(workflowType);
46+
result.append("\"");
47+
}
48+
result.append(", WorkflowExecution=\"");
49+
result.append(execution);
50+
return result.toString();
51+
}
52+
3953
public WorkflowExecution getExecution() {
4054
return execution;
4155
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.RespondQueryTaskCompletedRequest;
2626
import com.uber.cadence.WorkflowExecution;
2727
import com.uber.cadence.WorkflowType;
28+
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2829
import com.uber.cadence.internal.worker.DecisionTaskHandler;
2930
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
3031
import java.io.PrintWriter;
@@ -111,7 +112,18 @@ private Result handleDecisionTaskImpl(DecisionTaskWithHistoryIterator decisionTa
111112
DecisionsHelper decisionsHelper = decider.getDecisionsHelper();
112113
List<Decision> decisions = decisionsHelper.getDecisions();
113114
byte[] context = decisionsHelper.getWorkflowContextDataToReturn();
114-
if (log.isDebugEnabled()) {
115+
if (log.isTraceEnabled()) {
116+
WorkflowExecution execution = decisionTask.getWorkflowExecution();
117+
log.trace(
118+
"WorkflowTask startedEventId="
119+
+ decisionTask.getStartedEventId()
120+
+ ", WorkflowID="
121+
+ execution.getWorkflowId()
122+
+ ", RunID="
123+
+ execution.getRunId()
124+
+ " completed with "
125+
+ WorkflowExecutionUtils.prettyPrintDecisions(decisions));
126+
} else if (log.isDebugEnabled()) {
115127
WorkflowExecution execution = decisionTask.getWorkflowExecution();
116128
log.debug(
117129
"WorkflowTask startedEventId="

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void runUntilAllBlocked() throws Throwable {
150150
new WorkflowThreadImpl(
151151
false, threadPool, this, nr.name, false, runnerCancellationScope, nr.runnable);
152152
// It is important to prepend threads as there are callbacks
153-
// like signals that have to apply before any other threads.
153+
// like signals that have to run before any other threads.
154154
// Otherwise signal might be never processed if it was received
155155
// after workflow decided to close.
156156
threads.addFirst(thread);

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.uber.cadence.workflow.ActivityFailureException;
3636
import com.uber.cadence.workflow.ActivityTimeoutException;
3737
import com.uber.cadence.workflow.CancellationScope;
38+
import com.uber.cadence.workflow.ChildWorkflowException;
3839
import com.uber.cadence.workflow.ChildWorkflowFailureException;
3940
import com.uber.cadence.workflow.ChildWorkflowOptions;
4041
import com.uber.cadence.workflow.CompletablePromise;
@@ -49,7 +50,7 @@
4950
import java.util.concurrent.TimeUnit;
5051
import java.util.function.Consumer;
5152

52-
class SyncDecisionContext implements ActivityExecutor {
53+
final class SyncDecisionContext implements ActivityExecutor {
5354
private final DecisionContext context;
5455
private DeterministicRunner runner;
5556
private final DataConverter converter;
@@ -237,7 +238,9 @@ private RuntimeException mapChildWorkflowException(Exception failure) {
237238
if (failure instanceof CancellationException) {
238239
return (CancellationException) failure;
239240
}
240-
241+
if (failure instanceof ChildWorkflowException) {
242+
throw (ChildWorkflowException) failure;
243+
}
241244
if (!(failure instanceof ChildWorkflowTaskFailedException)) {
242245
throw new IllegalArgumentException("Unexpected exception type: ", failure);
243246
}

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public <T> Promise<T> executeActivity(
165165
.setWorkflowId("test-workflow-id")
166166
.setRunId(UUID.randomUUID().toString()));
167167
task.setActivityType(new ActivityType().setName(activityType));
168-
IWorkflowService service = new WorkflowServiceWrapper(testEnvironmentOptions.getService());
168+
IWorkflowService service = new WorkflowServiceWrapper(null);
169169
Result taskResult =
170170
activityTaskHandler.handle(service, testEnvironmentOptions.getDomain(), task);
171171
return Workflow.newPromise(getReply(task, taskResult, returnType));

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ public WorkflowClient newWorkflowClient() {
117117
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
118118
}
119119

120+
@Override
121+
public long currentTimeMillis() {
122+
return service.currentTimeMillis();
123+
}
124+
125+
@Override
126+
public void registerDelayedCallback(Duration delay, Runnable r) {
127+
service.registerDelayedCallback(delay, r);
128+
}
129+
120130
public IWorkflowService getWorkflowService() {
121131
return service;
122132
}
@@ -142,6 +152,10 @@ private WorkflowServiceWrapper() {
142152
impl = new TestWorkflowService();
143153
}
144154

155+
public long currentTimeMillis() {
156+
return impl.currentTimeMillis();
157+
}
158+
145159
@Override
146160
public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(
147161
RecordActivityTaskHeartbeatRequest heartbeatRequest)
@@ -500,5 +514,9 @@ public void getDiagnostics(StringBuilder result) {
500514
public void close() {
501515
impl.close();
502516
}
517+
518+
public void registerDelayedCallback(Duration delay, Runnable r) {
519+
impl.registerDelayedCallback(delay, r);
520+
}
503521
}
504522
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ public void setName(String name) {
161161
if (name == null) {
162162
name = "workflow-" + super.hashCode();
163163
}
164-
log.trace(String.format("Workflow thread \"%s\" created", name));
165164
this.task = new RunnableWrapper(context, name, detached, parentCancellationScope, runnable);
166165
}
167166

@@ -172,14 +171,11 @@ public boolean isDetached() {
172171

173172
@Override
174173
public void cancel() {
175-
log.trace(String.format("Workflow thread \"%s\" cancel called", getName()));
176174
task.cancellationScope.cancel();
177175
}
178176

179177
@Override
180178
public void cancel(String reason) {
181-
log.trace(
182-
String.format("Workflow thread \"%s cancel called with \"%s\" reason", getName(), reason));
183179
task.cancellationScope.cancel(reason);
184180
}
185181

@@ -203,7 +199,6 @@ public void start() {
203199
if (context.getStatus() != Status.CREATED) {
204200
throw new IllegalThreadStateException("already started");
205201
}
206-
log.trace(String.format("Workflow thread \"%s\" started", getName()));
207202
context.setStatus(Status.RUNNING);
208203
try {
209204
taskFuture = threadPool.submit(task);

src/main/java/com/uber/cadence/internal/testservice/RequestContext.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ Runnable getCallback() {
7373
private final List<Timer> timers = new ArrayList<>();
7474
private long workflowCompletedAtEventId = -1;
7575
private boolean needDecision;
76+
// How many times call SelfAdvancedTimer#lockTimeSkipping.
77+
// Negative means how many times to call SelfAdvancedTimer#unlockTimeSkipping.
78+
private int timerLocks;
7679

7780
/**
7881
* Creates an instance of the RequestContext
@@ -95,6 +98,18 @@ void add(RequestContext ctx) {
9598
this.events.addAll(ctx.getEvents());
9699
}
97100

101+
void lockTimer() {
102+
timerLocks++;
103+
}
104+
105+
void unlockTimer() {
106+
timerLocks--;
107+
}
108+
109+
public int getTimerLocks() {
110+
return timerLocks;
111+
}
112+
98113
long currentTimeInNanoseconds() {
99114
return clock.getAsLong() * NANOS_PER_MILLIS;
100115
}
@@ -193,4 +208,8 @@ void fireCallbacks() throws InternalServiceError {
193208
ExecutionId getExecutionId() {
194209
return executionId;
195210
}
211+
212+
public boolean isEmpty() {
213+
return events.isEmpty() && activityTasks.isEmpty() && decisionTask == null && timers.isEmpty();
214+
}
196215
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.testservice;
19+
20+
import java.time.Duration;
21+
import java.util.function.LongSupplier;
22+
23+
/**
24+
* Timer service that automatically forwards current time to the next task time when is not locked
25+
* through {@link #lockTimeSkipping()}.
26+
*/
27+
public interface SelfAdvancingTimer {
28+
29+
/**
30+
* Schedule a task with a specified delay. The actual wait time is defined by the internal clock
31+
* that might advance much faster than the wall clock.
32+
*/
33+
void schedule(Duration delay, Runnable task);
34+
35+
/** Supplier that returns current time of the timer when called. */
36+
LongSupplier getClock();
37+
38+
/**
39+
* Prohibit automatic time skipping until {@link #unlockTimeSkipping()} is called. Locks and
40+
* unlocks are counted.
41+
*/
42+
void lockTimeSkipping();
43+
44+
void unlockTimeSkipping();
45+
46+
/**
47+
* Update lock count. The same as calling lockTimeSkipping count number of times for positive
48+
* count and unlockTimeSkipping for negative count.
49+
*
50+
* @param count
51+
*/
52+
void updateLocks(int count);
53+
54+
void shutdown();
55+
}

0 commit comments

Comments
 (0)