Skip to content

Commit ce87dbf

Browse files
authored
Added support for WorkflowIdReusePolicy to TestWorkflowService (#155)
1 parent f6bf98a commit ce87dbf

File tree

7 files changed

+174
-20
lines changed

7 files changed

+174
-20
lines changed

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ Consumer<Exception> startChildWorkflow(
108108
String workflowId = parameters.getWorkflowId();
109109
if (workflowId == null) {
110110
workflowId = generateUniqueId();
111+
} else {
112+
if (scheduledExternalWorkflows.containsKey(workflowId)) {
113+
throw new StartChildWorkflowFailedException(
114+
0,
115+
new WorkflowExecution().setWorkflowId(workflowId),
116+
attributes.getWorkflowType(),
117+
ChildWorkflowExecutionFailedCause.WORKFLOW_ALREADY_RUNNING);
118+
}
111119
}
112120
attributes.setWorkflowId(workflowId);
113121
if (parameters.getDomain() == null) {

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static com.uber.cadence.internal.testservice.StateMachines.State.CANCELED;
3030
import static com.uber.cadence.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
3131
import static com.uber.cadence.internal.testservice.StateMachines.State.COMPLETED;
32+
import static com.uber.cadence.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
3233
import static com.uber.cadence.internal.testservice.StateMachines.State.FAILED;
3334
import static com.uber.cadence.internal.testservice.StateMachines.State.INITIATED;
3435
import static com.uber.cadence.internal.testservice.StateMachines.State.NONE;
@@ -124,7 +125,8 @@ enum State {
124125
TIMED_OUT,
125126
CANCELLATION_REQUESTED,
126127
CANCELED,
127-
COMPLETED
128+
COMPLETED,
129+
CONTINUED_AS_NEW,
128130
}
129131

130132
enum Action {
@@ -201,7 +203,7 @@ static StateMachine<WorkflowData> newWorkflowStateMachine() {
201203
return new StateMachine<>(new WorkflowData())
202204
.add(NONE, START, STARTED, StateMachines::startWorkflow)
203205
.add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
204-
.add(STARTED, CONTINUE_AS_NEW, COMPLETED, StateMachines::continueAsNewWorkflow)
206+
.add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
205207
.add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
206208
.add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
207209
.add(
@@ -308,6 +310,12 @@ private static void startChildWorkflowFailed(
308310
ChildWorkflowData data,
309311
StartChildWorkflowExecutionFailedEventAttributes a,
310312
long notUsed) {
313+
a.setInitiatedEventId(data.initiatedEventId);
314+
a.setWorkflowType(data.initiatedEvent.getWorkflowType());
315+
a.setWorkflowId(data.initiatedEvent.getWorkflowId());
316+
if (data.initiatedEvent.isSetDomain()) {
317+
a.setDomain(data.initiatedEvent.getDomain());
318+
}
311319
HistoryEvent event =
312320
new HistoryEvent()
313321
.setEventType(EventType.StartChildWorkflowExecutionFailed)
@@ -353,6 +361,11 @@ private static void childWorkflowFailed(
353361
long notUsed) {
354362
a.setInitiatedEventId(data.initiatedEventId);
355363
a.setStartedEventId(data.startedEventId);
364+
a.setWorkflowExecution(data.execution);
365+
a.setWorkflowType(data.initiatedEvent.getWorkflowType());
366+
if (data.initiatedEvent.domain != null) {
367+
a.setDomain(data.initiatedEvent.domain);
368+
}
356369
HistoryEvent event =
357370
new HistoryEvent()
358371
.setEventType(EventType.ChildWorkflowExecutionFailed)

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,18 @@
4848
import com.uber.cadence.SignalWorkflowExecutionRequest;
4949
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
5050
import com.uber.cadence.StartWorkflowExecutionRequest;
51+
import com.uber.cadence.WorkflowExecutionCloseStatus;
5152
import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId;
53+
import java.util.Optional;
5254
import org.apache.thrift.TException;
5355

5456
interface TestWorkflowMutableState {
5557

5658
ExecutionId getExecutionId();
5759

60+
/** @return close status of the workflow or empty if still open */
61+
Optional<WorkflowExecutionCloseStatus> getCloseStatus();
62+
5863
StartWorkflowExecutionRequest getStartRequest();
5964

6065
void startDecisionTask(PollForDecisionTaskResponse task, PollForDecisionTaskRequest pollRequest)

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import com.uber.cadence.StartTimerDecisionAttributes;
6868
import com.uber.cadence.StartWorkflowExecutionRequest;
6969
import com.uber.cadence.TimeoutType;
70+
import com.uber.cadence.WorkflowExecutionCloseStatus;
7071
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
7172
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
7273
import com.uber.cadence.internal.testservice.StateMachines.Action;
@@ -192,6 +193,26 @@ public ExecutionId getExecutionId() {
192193
return executionId;
193194
}
194195

196+
@Override
197+
public Optional<WorkflowExecutionCloseStatus> getCloseStatus() {
198+
switch (workflow.getState()) {
199+
case NONE:
200+
case INITIATED:
201+
case STARTED:
202+
case CANCELLATION_REQUESTED:
203+
return Optional.empty();
204+
case FAILED:
205+
return Optional.of(WorkflowExecutionCloseStatus.FAILED);
206+
case TIMED_OUT:
207+
return Optional.of(WorkflowExecutionCloseStatus.TIMED_OUT);
208+
case CANCELED:
209+
return Optional.of(WorkflowExecutionCloseStatus.CANCELED);
210+
case COMPLETED:
211+
return Optional.of(WorkflowExecutionCloseStatus.COMPLETED);
212+
}
213+
throw new IllegalStateException("unreachable");
214+
}
215+
195216
@Override
196217
public StartWorkflowExecutionRequest getStartRequest() {
197218
return startRequest;
@@ -581,11 +602,10 @@ public void childWorklfowTimedOut(
581602

582603
@Override
583604
public void failStartChildWorkflow(
584-
String activityId, StartChildWorkflowExecutionFailedEventAttributes a)
605+
String childId, StartChildWorkflowExecutionFailedEventAttributes a)
585606
throws InternalServiceError, EntityNotExistsError, BadRequestError {
586607
update(
587608
ctx -> {
588-
String childId = a.getWorkflowId();
589609
StateMachine<ChildWorkflowData> child = getChildWorkflow(childId);
590610
child.action(StateMachines.Action.FAIL, ctx, a, 0);
591611
childWorkflows.remove(childId);

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@
6666
import com.uber.cadence.UpdateDomainResponse;
6767
import com.uber.cadence.WorkflowExecution;
6868
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
69+
import com.uber.cadence.WorkflowExecutionCloseStatus;
6970
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
7071
import com.uber.cadence.WorkflowExecutionFilter;
7172
import com.uber.cadence.WorkflowExecutionInfo;
73+
import com.uber.cadence.WorkflowIdReusePolicy;
7274
import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId;
7375
import com.uber.cadence.internal.testservice.TestWorkflowStore.WorkflowState;
7476
import com.uber.cadence.serviceclient.IWorkflowService;
@@ -102,7 +104,7 @@ public final class TestWorkflowService implements IWorkflowService {
102104
private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
103105

104106
// key->WorkflowId
105-
private final Map<WorkflowId, TestWorkflowMutableState> openExecutions = new HashMap<>();
107+
private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
106108

107109
public void close() {
108110
store.close();
@@ -129,7 +131,7 @@ private TestWorkflowMutableState getMutableState(WorkflowId workflowId)
129131
throws EntityNotExistsError {
130132
lock.lock();
131133
try {
132-
TestWorkflowMutableState mutableState = openExecutions.get(workflowId);
134+
TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
133135
if (mutableState == null) {
134136
throw new EntityNotExistsError("Execution not found in mutable state: " + workflowId);
135137
}
@@ -177,27 +179,45 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
177179
String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
178180
String domain = requireNotNull("Domain", startRequest.getDomain());
179181
WorkflowId workflowId = new WorkflowId(domain, requestWorkflowId);
180-
TestWorkflowMutableState running;
182+
TestWorkflowMutableState existing;
181183
lock.lock();
182184
try {
183-
running = openExecutions.get(workflowId);
185+
existing = executionsByWorkflowId.get(workflowId);
184186
} finally {
185187
lock.unlock();
186188
}
187-
if (running != null) {
188-
WorkflowExecutionAlreadyStartedError error = new WorkflowExecutionAlreadyStartedError();
189-
WorkflowExecution execution = running.getExecutionId().getExecution();
190-
error.setMessage(
191-
String.format(
192-
"Workflow execution already running. WorkflowId: %s, " + "RunId: %s",
193-
execution.getWorkflowId(), execution.getRunId()));
194-
error.setRunId(execution.getRunId());
195-
error.setStartRequestId(startRequest.getRequestId());
196-
throw error;
189+
if (existing != null) {
190+
Optional<WorkflowExecutionCloseStatus> statusOptional = existing.getCloseStatus();
191+
WorkflowIdReusePolicy policy =
192+
startRequest.isSetWorkflowIdReusePolicy()
193+
? startRequest.getWorkflowIdReusePolicy()
194+
: WorkflowIdReusePolicy.AllowDuplicateFailedOnly;
195+
if (!statusOptional.isPresent() || policy == WorkflowIdReusePolicy.RejectDuplicate) {
196+
return throwDuplicatedWorkflow(startRequest, existing);
197+
}
198+
WorkflowExecutionCloseStatus status = statusOptional.get();
199+
if (policy == WorkflowIdReusePolicy.AllowDuplicateFailedOnly
200+
&& (status == WorkflowExecutionCloseStatus.COMPLETED
201+
|| status == WorkflowExecutionCloseStatus.CONTINUED_AS_NEW)) {
202+
return throwDuplicatedWorkflow(startRequest, existing);
203+
}
197204
}
198205
return startWorkflowExecutionNoRunningCheck(startRequest, parent, workflowId);
199206
}
200207

208+
private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
209+
StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing)
210+
throws WorkflowExecutionAlreadyStartedError {
211+
WorkflowExecutionAlreadyStartedError error = new WorkflowExecutionAlreadyStartedError();
212+
WorkflowExecution execution = existing.getExecutionId().getExecution();
213+
error.setMessage(
214+
String.format(
215+
"WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId()));
216+
error.setRunId(execution.getRunId());
217+
error.setStartRequestId(startRequest.getRequestId());
218+
throw error;
219+
}
220+
201221
private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheck(
202222
StartWorkflowExecutionRequest startRequest,
203223
Optional<TestWorkflowMutableState> parent,
@@ -210,7 +230,7 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheck(
210230
ExecutionId executionId = new ExecutionId(domain, execution);
211231
lock.lock();
212232
try {
213-
openExecutions.put(workflowId, result);
233+
executionsByWorkflowId.put(workflowId, result);
214234
executions.put(executionId, result);
215235
} finally {
216236
lock.unlock();

src/main/java/com/uber/cadence/workflow/ChildWorkflowException.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ protected ChildWorkflowException(
3434
WorkflowExecution workflowExecution,
3535
WorkflowType workflowType) {
3636
super(
37-
"WorkflowType=\""
37+
message
38+
+ " WorkflowType=\""
3839
+ workflowType.getName()
3940
+ "\", ID=\""
4041
+ workflowExecution.getWorkflowId()

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,7 @@ public interface WaitOnSignalWorkflow {
10271027
}
10281028

10291029
public static class TestWaitOnSignalWorkflowImpl implements WaitOnSignalWorkflow {
1030+
10301031
private final CompletablePromise<String> signal = Workflow.newPromise();
10311032

10321033
@Override
@@ -1832,6 +1833,92 @@ public void testChildWorkflow() {
18321833
assertEquals("HELLO WORLD!", client.execute(taskList));
18331834
}
18341835

1836+
private static String childReexecuteId = UUID.randomUUID().toString();
1837+
1838+
public interface WorkflowIdReusePolicyParent {
1839+
1840+
@WorkflowMethod
1841+
String execute(boolean parallel, WorkflowIdReusePolicy policy);
1842+
}
1843+
1844+
public static class TestChildReexecuteWorkflow implements WorkflowIdReusePolicyParent {
1845+
1846+
public TestChildReexecuteWorkflow() {}
1847+
1848+
@Override
1849+
public String execute(boolean parallel, WorkflowIdReusePolicy policy) {
1850+
ChildWorkflowOptions options =
1851+
new ChildWorkflowOptions.Builder()
1852+
.setWorkflowId(childReexecuteId)
1853+
.setWorkflowIdReusePolicy(policy)
1854+
.build();
1855+
1856+
ITestNamedChild child1 = Workflow.newChildWorkflowStub(ITestNamedChild.class, options);
1857+
Promise<String> r1P = Async.function(child1::execute, "Hello ");
1858+
String r1 = null;
1859+
if (!parallel) {
1860+
r1 = r1P.get();
1861+
}
1862+
ITestNamedChild child2 = Workflow.newChildWorkflowStub(ITestNamedChild.class, options);
1863+
String r2 = child2.execute("World!");
1864+
if (parallel) {
1865+
r1 = r1P.get();
1866+
}
1867+
assertEquals(childReexecuteId, Workflow.getWorkflowExecution(child1).get().getWorkflowId());
1868+
assertEquals(childReexecuteId, Workflow.getWorkflowExecution(child2).get().getWorkflowId());
1869+
return r1 + r2;
1870+
}
1871+
}
1872+
1873+
@Test
1874+
public void testChildAlreadyRunning() {
1875+
startWorkerFor(TestChildReexecuteWorkflow.class, TestNamedChild.class);
1876+
1877+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
1878+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200));
1879+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(60));
1880+
options.setTaskList(taskList);
1881+
WorkflowIdReusePolicyParent client =
1882+
workflowClient.newWorkflowStub(WorkflowIdReusePolicyParent.class, options.build());
1883+
try {
1884+
client.execute(false, WorkflowIdReusePolicy.RejectDuplicate);
1885+
fail("unreachable");
1886+
} catch (WorkflowFailureException e) {
1887+
assertTrue(e.getCause() instanceof StartChildWorkflowFailedException);
1888+
}
1889+
}
1890+
1891+
@Test
1892+
public void testChildStartTwice() {
1893+
startWorkerFor(TestChildReexecuteWorkflow.class, TestNamedChild.class);
1894+
1895+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
1896+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200));
1897+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(60));
1898+
options.setTaskList(taskList);
1899+
WorkflowIdReusePolicyParent client =
1900+
workflowClient.newWorkflowStub(WorkflowIdReusePolicyParent.class, options.build());
1901+
try {
1902+
client.execute(true, WorkflowIdReusePolicy.RejectDuplicate);
1903+
fail("unreachable");
1904+
} catch (WorkflowFailureException e) {
1905+
assertTrue(e.getCause() instanceof StartChildWorkflowFailedException);
1906+
}
1907+
}
1908+
1909+
@Test
1910+
public void testChildReexecute() {
1911+
startWorkerFor(TestChildReexecuteWorkflow.class, TestNamedChild.class);
1912+
1913+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
1914+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200));
1915+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(60));
1916+
options.setTaskList(taskList);
1917+
WorkflowIdReusePolicyParent client =
1918+
workflowClient.newWorkflowStub(WorkflowIdReusePolicyParent.class, options.build());
1919+
assertEquals("HELLO WORLD!", client.execute(false, WorkflowIdReusePolicy.AllowDuplicate));
1920+
}
1921+
18351922
public static class TestChildWorkflowRetryWorkflow implements TestWorkflow1 {
18361923

18371924
private ITestChild child;

0 commit comments

Comments
 (0)