Skip to content

Commit 8714d94

Browse files
authored
Fixed UUID generation bug when using TestWorkflowService (#451)
1 parent 42e570d commit 8714d94

File tree

6 files changed

+206
-21
lines changed

6 files changed

+206
-21
lines changed

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,7 @@ public interface DecisionContext extends ReplayAware {
8787
*/
8888
Map<String, Object> getPropagatedContexts();
8989

90-
/**
91-
* Returns the set of configured context propagators
92-
*
93-
* @return
94-
*/
90+
/** Returns the set of configured context propagators */
9591
List<ContextPropagator> getContextPropagators();
9692

9793
/**

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters continue
233233
/** Replay safe UUID */
234234
UUID randomUUID() {
235235
String runId = workflowContext.getCurrentRunId();
236+
if (runId == null) {
237+
throw new Error("null currentRunId");
238+
}
236239
String id = runId + ":" + decisions.getAndIncrementNextId();
237240
byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
238241
return UUID.nameUUIDFromBytes(bytes);

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,22 @@ static final class WorkflowData {
152152
int backoffStartIntervalInSeconds;
153153
String cronSchedule;
154154
byte[] lastCompletionResult;
155+
String originalExecutionRunId;
156+
Optional<String> continuedExecutionRunId;
155157

156158
WorkflowData(
157159
Optional<RetryState> retryState,
158160
int backoffStartIntervalInSeconds,
159161
String cronSchedule,
160-
byte[] lastCompletionResult) {
162+
byte[] lastCompletionResult,
163+
String originalExecutionRunId,
164+
Optional<String> continuedExecutionRunId) {
161165
this.retryState = retryState;
162166
this.backoffStartIntervalInSeconds = backoffStartIntervalInSeconds;
163167
this.cronSchedule = cronSchedule;
164168
this.lastCompletionResult = lastCompletionResult;
169+
this.originalExecutionRunId = originalExecutionRunId;
170+
this.continuedExecutionRunId = continuedExecutionRunId;
165171
}
166172
}
167173

@@ -539,6 +545,10 @@ private static void startWorkflow(
539545
if (data.retryState.isPresent()) {
540546
a.setAttempt(data.retryState.get().getAttempt());
541547
}
548+
a.setOriginalExecutionRunId(data.originalExecutionRunId);
549+
if (data.continuedExecutionRunId.isPresent()) {
550+
a.setContinuedExecutionRunId(data.continuedExecutionRunId.get());
551+
}
542552
a.setLastCompletionResult(data.lastCompletionResult);
543553
a.setMemo(request.getMemo());
544554
a.setSearchAttributes((request.getSearchAttributes()));

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,66 @@
2525
import com.cronutils.parser.CronParser;
2626
import com.google.common.base.Strings;
2727
import com.google.common.base.Throwables;
28-
import com.uber.cadence.*;
28+
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
29+
import com.uber.cadence.BadRequestError;
30+
import com.uber.cadence.CancelTimerDecisionAttributes;
31+
import com.uber.cadence.CancelTimerFailedEventAttributes;
32+
import com.uber.cadence.CancelWorkflowExecutionDecisionAttributes;
33+
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
34+
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
35+
import com.uber.cadence.ChildWorkflowExecutionFailedEventAttributes;
36+
import com.uber.cadence.ChildWorkflowExecutionStartedEventAttributes;
37+
import com.uber.cadence.ChildWorkflowExecutionTimedOutEventAttributes;
38+
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
39+
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
40+
import com.uber.cadence.Decision;
41+
import com.uber.cadence.DecisionTaskFailedCause;
42+
import com.uber.cadence.EntityNotExistsError;
43+
import com.uber.cadence.EventType;
44+
import com.uber.cadence.FailWorkflowExecutionDecisionAttributes;
45+
import com.uber.cadence.HistoryEvent;
46+
import com.uber.cadence.InternalServiceError;
47+
import com.uber.cadence.MarkerRecordedEventAttributes;
48+
import com.uber.cadence.PollForActivityTaskRequest;
49+
import com.uber.cadence.PollForActivityTaskResponse;
50+
import com.uber.cadence.PollForDecisionTaskRequest;
51+
import com.uber.cadence.PollForDecisionTaskResponse;
52+
import com.uber.cadence.QueryFailedError;
53+
import com.uber.cadence.QueryRejectCondition;
54+
import com.uber.cadence.QueryRejected;
55+
import com.uber.cadence.QueryTaskCompletedType;
56+
import com.uber.cadence.QueryWorkflowRequest;
57+
import com.uber.cadence.QueryWorkflowResponse;
58+
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
59+
import com.uber.cadence.RecordMarkerDecisionAttributes;
60+
import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes;
61+
import com.uber.cadence.RequestCancelActivityTaskFailedEventAttributes;
62+
import com.uber.cadence.RequestCancelExternalWorkflowExecutionDecisionAttributes;
63+
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
64+
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
65+
import com.uber.cadence.RespondActivityTaskCanceledRequest;
66+
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
67+
import com.uber.cadence.RespondActivityTaskCompletedRequest;
68+
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
69+
import com.uber.cadence.RespondActivityTaskFailedRequest;
70+
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
71+
import com.uber.cadence.RespondDecisionTaskFailedRequest;
72+
import com.uber.cadence.RespondQueryTaskCompletedRequest;
73+
import com.uber.cadence.RetryPolicy;
74+
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
75+
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
76+
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
77+
import com.uber.cadence.SignalWorkflowExecutionRequest;
78+
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
79+
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
80+
import com.uber.cadence.StartTimerDecisionAttributes;
81+
import com.uber.cadence.StartWorkflowExecutionRequest;
82+
import com.uber.cadence.StickyExecutionAttributes;
83+
import com.uber.cadence.TimeoutType;
84+
import com.uber.cadence.WorkflowExecution;
85+
import com.uber.cadence.WorkflowExecutionCloseStatus;
86+
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
87+
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
2988
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3089
import com.uber.cadence.internal.testservice.StateMachines.Action;
3190
import com.uber.cadence.internal.testservice.StateMachines.ActivityTaskData;
@@ -45,7 +104,14 @@
45104
import java.time.Instant;
46105
import java.time.ZoneOffset;
47106
import java.time.ZonedDateTime;
48-
import java.util.*;
107+
import java.util.ArrayList;
108+
import java.util.HashMap;
109+
import java.util.List;
110+
import java.util.Map;
111+
import java.util.Objects;
112+
import java.util.Optional;
113+
import java.util.OptionalLong;
114+
import java.util.UUID;
49115
import java.util.concurrent.CompletableFuture;
50116
import java.util.concurrent.ConcurrentHashMap;
51117
import java.util.concurrent.ExecutionException;
@@ -90,6 +156,7 @@ void apply(RequestContext ctx)
90156
private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
91157
new ConcurrentHashMap<>();
92158
private final Map<String, PollForDecisionTaskResponse> queryRequests = new ConcurrentHashMap<>();
159+
private final Optional<String> continuedExecutionRunId;
93160
public StickyExecutionAttributes stickyExecutionAttributes;
94161

95162
/**
@@ -104,11 +171,13 @@ void apply(RequestContext ctx)
104171
byte[] lastCompletionResult,
105172
Optional<TestWorkflowMutableState> parent,
106173
OptionalLong parentChildInitiatedEventId,
174+
Optional<String> continuedExecutionRunId,
107175
TestWorkflowService service,
108176
TestWorkflowStore store) {
109177
this.startRequest = startRequest;
110178
this.parent = parent;
111179
this.parentChildInitiatedEventId = parentChildInitiatedEventId;
180+
this.continuedExecutionRunId = continuedExecutionRunId;
112181
this.service = service;
113182
String runId = UUID.randomUUID().toString();
114183
this.executionId =
@@ -121,7 +190,10 @@ void apply(RequestContext ctx)
121190
retryState,
122191
backoffStartIntervalInSeconds,
123192
startRequest.getCronSchedule(),
124-
lastCompletionResult);
193+
lastCompletionResult,
194+
runId, // Test service doesn't support reset. Thus originalRunId is always the same as
195+
// runId.
196+
continuedExecutionRunId);
125197
this.workflow = StateMachines.newWorkflowStateMachine(data);
126198
}
127199

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,82 @@
1717

1818
package com.uber.cadence.internal.testservice;
1919

20-
import com.uber.cadence.*;
20+
import com.uber.cadence.BadRequestError;
21+
import com.uber.cadence.ClientVersionNotSupportedError;
22+
import com.uber.cadence.ClusterInfo;
23+
import com.uber.cadence.CountWorkflowExecutionsRequest;
24+
import com.uber.cadence.CountWorkflowExecutionsResponse;
25+
import com.uber.cadence.DeprecateDomainRequest;
26+
import com.uber.cadence.DescribeDomainRequest;
27+
import com.uber.cadence.DescribeDomainResponse;
28+
import com.uber.cadence.DescribeTaskListRequest;
29+
import com.uber.cadence.DescribeTaskListResponse;
30+
import com.uber.cadence.DescribeWorkflowExecutionRequest;
31+
import com.uber.cadence.DescribeWorkflowExecutionResponse;
32+
import com.uber.cadence.DomainAlreadyExistsError;
33+
import com.uber.cadence.DomainNotActiveError;
34+
import com.uber.cadence.EntityNotExistsError;
35+
import com.uber.cadence.GetSearchAttributesResponse;
36+
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
37+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
38+
import com.uber.cadence.InternalServiceError;
39+
import com.uber.cadence.LimitExceededError;
40+
import com.uber.cadence.ListArchivedWorkflowExecutionsRequest;
41+
import com.uber.cadence.ListArchivedWorkflowExecutionsResponse;
42+
import com.uber.cadence.ListClosedWorkflowExecutionsRequest;
43+
import com.uber.cadence.ListClosedWorkflowExecutionsResponse;
44+
import com.uber.cadence.ListDomainsRequest;
45+
import com.uber.cadence.ListDomainsResponse;
46+
import com.uber.cadence.ListOpenWorkflowExecutionsRequest;
47+
import com.uber.cadence.ListOpenWorkflowExecutionsResponse;
48+
import com.uber.cadence.ListTaskListPartitionsRequest;
49+
import com.uber.cadence.ListTaskListPartitionsResponse;
50+
import com.uber.cadence.ListWorkflowExecutionsRequest;
51+
import com.uber.cadence.ListWorkflowExecutionsResponse;
52+
import com.uber.cadence.PollForActivityTaskRequest;
53+
import com.uber.cadence.PollForActivityTaskResponse;
54+
import com.uber.cadence.PollForDecisionTaskRequest;
55+
import com.uber.cadence.PollForDecisionTaskResponse;
56+
import com.uber.cadence.QueryFailedError;
57+
import com.uber.cadence.QueryWorkflowRequest;
58+
import com.uber.cadence.QueryWorkflowResponse;
59+
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
60+
import com.uber.cadence.RecordActivityTaskHeartbeatRequest;
61+
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
62+
import com.uber.cadence.RegisterDomainRequest;
63+
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
64+
import com.uber.cadence.ResetStickyTaskListRequest;
65+
import com.uber.cadence.ResetStickyTaskListResponse;
66+
import com.uber.cadence.ResetWorkflowExecutionRequest;
67+
import com.uber.cadence.ResetWorkflowExecutionResponse;
68+
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
69+
import com.uber.cadence.RespondActivityTaskCanceledRequest;
70+
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
71+
import com.uber.cadence.RespondActivityTaskCompletedRequest;
72+
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
73+
import com.uber.cadence.RespondActivityTaskFailedRequest;
74+
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
75+
import com.uber.cadence.RespondDecisionTaskCompletedResponse;
76+
import com.uber.cadence.RespondDecisionTaskFailedRequest;
77+
import com.uber.cadence.RespondQueryTaskCompletedRequest;
78+
import com.uber.cadence.RetryPolicy;
79+
import com.uber.cadence.ServiceBusyError;
80+
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
81+
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
82+
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
83+
import com.uber.cadence.SignalWorkflowExecutionRequest;
84+
import com.uber.cadence.StartWorkflowExecutionRequest;
85+
import com.uber.cadence.StartWorkflowExecutionResponse;
86+
import com.uber.cadence.TerminateWorkflowExecutionRequest;
87+
import com.uber.cadence.UpdateDomainRequest;
88+
import com.uber.cadence.UpdateDomainResponse;
89+
import com.uber.cadence.WorkflowExecution;
90+
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
91+
import com.uber.cadence.WorkflowExecutionCloseStatus;
92+
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
93+
import com.uber.cadence.WorkflowExecutionFilter;
94+
import com.uber.cadence.WorkflowExecutionInfo;
95+
import com.uber.cadence.WorkflowIdReusePolicy;
2196
import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId;
2297
import com.uber.cadence.internal.testservice.TestWorkflowStore.WorkflowState;
2398
import com.uber.cadence.serviceclient.IWorkflowService;
@@ -175,7 +250,7 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
175250
Optional<RetryState> retryState = newRetryStateLocked(retryPolicy);
176251
return startWorkflowExecutionNoRunningCheckLocked(
177252
startRequest,
178-
false,
253+
Optional.empty(),
179254
retryState,
180255
backoffStartIntervalInSeconds,
181256
null,
@@ -213,7 +288,7 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
213288

214289
private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
215290
StartWorkflowExecutionRequest startRequest,
216-
boolean continuedAsNew,
291+
Optional<String> continuedExecutionRunId,
217292
Optional<RetryState> retryState,
218293
int backoffStartIntervalInSeconds,
219294
byte[] lastCompletionResult,
@@ -231,13 +306,14 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocke
231306
lastCompletionResult,
232307
parent,
233308
parentChildInitiatedEventId,
309+
continuedExecutionRunId,
234310
this,
235311
store);
236312
WorkflowExecution execution = mutableState.getExecutionId().getExecution();
237313
ExecutionId executionId = new ExecutionId(domain, execution);
238314
executionsByWorkflowId.put(workflowId, mutableState);
239315
executions.put(executionId, mutableState);
240-
mutableState.startWorkflow(continuedAsNew, signalWithStartSignal);
316+
mutableState.startWorkflow(continuedExecutionRunId.isPresent(), signalWithStartSignal);
241317
return new StartWorkflowExecutionResponse().setRunId(execution.getRunId());
242318
}
243319

@@ -537,7 +613,7 @@ public String continueAsNew(
537613
StartWorkflowExecutionResponse response =
538614
startWorkflowExecutionNoRunningCheckLocked(
539615
startRequest,
540-
true,
616+
Optional.of(executionId.getExecution().getRunId()),
541617
retryState,
542618
a.getBackoffStartIntervalInSeconds(),
543619
a.getLastCompletionResult(),

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,7 @@
1818
package com.uber.cadence.workflow;
1919

2020
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertFalse;
23-
import static org.junit.Assert.assertNotEquals;
24-
import static org.junit.Assert.assertNotNull;
25-
import static org.junit.Assert.assertNull;
26-
import static org.junit.Assert.assertTrue;
27-
import static org.junit.Assert.fail;
21+
import static org.junit.Assert.*;
2822

2923
import com.google.common.util.concurrent.UncheckedExecutionException;
3024
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
@@ -2652,6 +2646,40 @@ public void testChildWorkflowTimeout() {
26522646
assertEquals("ChildWorkflowTimedOutException", client.execute(taskList));
26532647
}
26542648

2649+
public static class TestParentWorkflowContinueAsNew implements TestWorkflow1 {
2650+
2651+
private final ITestChild child1 =
2652+
Workflow.newChildWorkflowStub(
2653+
ITestChild.class,
2654+
new ChildWorkflowOptions.Builder()
2655+
.setWorkflowIdReusePolicy(WorkflowIdReusePolicy.RejectDuplicate)
2656+
.build());
2657+
private final TestWorkflow1 self = Workflow.newContinueAsNewStub(TestWorkflow1.class);
2658+
2659+
@Override
2660+
public String execute(String arg) {
2661+
child1.execute("Hello", 0);
2662+
if (arg.length() > 0) {
2663+
self.execute(""); // continue as new
2664+
}
2665+
return "foo";
2666+
}
2667+
}
2668+
2669+
/** Reproduction of a bug when a child of continued as new workflow has the same UUID ID. */
2670+
@Test
2671+
public void testParentContinueAsNew() {
2672+
child2Id = UUID.randomUUID().toString();
2673+
startWorkerFor(TestParentWorkflowContinueAsNew.class, TestChild.class);
2674+
2675+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
2676+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200));
2677+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(60));
2678+
options.setTaskList(taskList);
2679+
TestWorkflow1 client = workflowClient.newWorkflowStub(TestWorkflow1.class, options.build());
2680+
assertEquals("foo", client.execute("not empty"));
2681+
}
2682+
26552683
private static String childReexecuteId = UUID.randomUUID().toString();
26562684

26572685
public interface WorkflowIdReusePolicyParent {

0 commit comments

Comments
 (0)