Skip to content

Commit 93e3493

Browse files
authored
Random UUID (#169)
* Added Workflow.randomUUID * Added Workflow.randomUUID * Added Workflow.newRandom * Added testUUIDAndRandom unit test
1 parent 9eac91b commit 93e3493

14 files changed

+183
-23
lines changed

src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,13 @@ Consumer<Exception> scheduleActivityTask(
9898
(int) parameters.getScheduleToStartTimeoutSeconds());
9999
attributes.setStartToCloseTimeoutSeconds((int) parameters.getStartToCloseTimeoutSeconds());
100100

101+
// attributes.setTaskPriority(InternalUtils.taskPriorityToString(parameters.getTaskPriority()));
102+
String activityId = parameters.getActivityId();
103+
if (activityId == null) {
104+
activityId = String.valueOf(decisions.getAndIncrementNextId());
105+
}
106+
attributes.setActivityId(activityId);
107+
101108
String taskList = parameters.getTaskList();
102109
if (taskList != null && !taskList.isEmpty()) {
103110
TaskList tl = new TaskList();

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback)
107107
final OpenRequestInfo<?, Long> context = new OpenRequestInfo<>(firingTime);
108108
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes();
109109
timer.setStartToFireTimeoutSeconds(delaySeconds);
110+
timer.setTimerId(String.valueOf(decisions.getAndIncrementNextId()));
110111
long startEventId = decisions.startTimer(timer);
111112
context.setCompletionHandle((ctx, e) -> callback.accept(e));
112113
scheduledTimers.put(startEventId, context);

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.uber.m3.tally.Scope;
2727
import java.time.Duration;
2828
import java.util.Optional;
29+
import java.util.Random;
30+
import java.util.UUID;
2931
import java.util.function.BiConsumer;
3032
import java.util.function.Consumer;
3133

@@ -147,9 +149,14 @@ Optional<byte[]> mutableSideEffect(
147149
*/
148150
int getVersion(String changeID, DataConverter dataConverter, int minSupported, int maxSupported);
149151

152+
Random newRandom();
153+
150154
/** @return scope to be used for metrics reporting. */
151155
Scope getMetricsScope();
152156

153157
/** @return whether we do logging during decision replay. */
154158
boolean getEnableLoggingInReplay();
159+
160+
/** @return replay safe UUID */
161+
UUID randomUUID();
155162
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.uber.m3.tally.Scope;
3333
import java.time.Duration;
3434
import java.util.Optional;
35+
import java.util.Random;
36+
import java.util.UUID;
3537
import java.util.function.BiConsumer;
3638
import java.util.function.Consumer;
3739

@@ -71,6 +73,16 @@ public boolean getEnableLoggingInReplay() {
7173
return enableLoggingInReplay;
7274
}
7375

76+
@Override
77+
public UUID randomUUID() {
78+
return workflowClient.randomUUID();
79+
}
80+
81+
@Override
82+
public Random newRandom() {
83+
return workflowClient.newRandom();
84+
}
85+
7486
@Override
7587
public Scope getMetricsScope() {
7688
return metricsScope;

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
5555
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
5656
import com.uber.cadence.internal.worker.WorkflowExecutionException;
57-
import java.nio.charset.StandardCharsets;
5857
import java.util.ArrayList;
5958
import java.util.Arrays;
6059
import java.util.HashMap;
@@ -91,6 +90,8 @@ class DecisionsHelper {
9190
*/
9291
private long nextDecisionEventId;
9392

93+
private long idCounter;
94+
9495
private DecisionEvents decisionEvents;
9596

9697
/** Use access-order to ensure that decisions are emitted in order of their creation */
@@ -116,7 +117,6 @@ long scheduleActivityTask(ScheduleActivityTaskDecisionAttributes schedule) {
116117
addAllMissingVersionMarker(false, Optional.empty());
117118

118119
long nextDecisionEventId = getNextDecisionEventId();
119-
schedule.setActivityId(getNextId());
120120
DecisionId decisionId = new DecisionId(DecisionTarget.ACTIVITY, nextDecisionEventId);
121121
activityIdToScheduledEventId.put(schedule.getActivityId(), nextDecisionEventId);
122122
addDecision(decisionId, new ActivityDecisionStateMachine(decisionId, schedule));
@@ -194,14 +194,9 @@ boolean handleRequestCancelActivityTaskFailed(HistoryEvent event) {
194194
return decision.isDone();
195195
}
196196

197-
long startChildWorkflowExecution(
198-
StartChildWorkflowExecutionDecisionAttributes childWorkflow, String runId) {
197+
long startChildWorkflowExecution(StartChildWorkflowExecutionDecisionAttributes childWorkflow) {
199198
addAllMissingVersionMarker(false, Optional.empty());
200199

201-
if (childWorkflow.getWorkflowId() == null) {
202-
childWorkflow.setWorkflowId(runId + ":" + getNextId());
203-
}
204-
205200
long nextDecisionEventId = getNextDecisionEventId();
206201
DecisionId decisionId = new DecisionId(DecisionTarget.CHILD_WORKFLOW, nextDecisionEventId);
207202
addDecision(decisionId, new ChildWorkflowDecisionStateMachine(decisionId, childWorkflow));
@@ -269,8 +264,6 @@ void handleRequestCancelExternalWorkflowExecutionFailed(HistoryEvent event) {
269264
long signalExternalWorkflowExecution(SignalExternalWorkflowExecutionDecisionAttributes signal) {
270265
addAllMissingVersionMarker(false, Optional.empty());
271266

272-
signal.setControl(getNextId().getBytes(StandardCharsets.UTF_8));
273-
274267
long nextDecisionEventId = getNextDecisionEventId();
275268
DecisionId decisionId =
276269
new DecisionId(DecisionTarget.SIGNAL_EXTERNAL_WORKFLOW, nextDecisionEventId);
@@ -306,7 +299,6 @@ long startTimer(StartTimerDecisionAttributes request) {
306299

307300
long startEventId = getNextDecisionEventId();
308301
DecisionId decisionId = new DecisionId(DecisionTarget.TIMER, startEventId);
309-
request.setTimerId(String.valueOf(getNextId()));
310302
addDecision(decisionId, new TimerDecisionStateMachine(decisionId, request));
311303
return startEventId;
312304
}
@@ -687,11 +679,8 @@ private DecisionStateMachine getDecision(DecisionId decisionId) {
687679
return result;
688680
}
689681

690-
private String getNextId() {
691-
if (nextDecisionEventId == 0) {
692-
throw new IllegalStateException("nextDecisionEventId is not set");
693-
}
694-
return String.valueOf(nextDecisionEventId);
682+
String getAndIncrementNextId() {
683+
return String.valueOf(idCounter++);
695684
}
696685

697686
HistoryEvent getDecisionEvent(long eventId) {

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,12 @@
3939
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
4040
import com.uber.cadence.workflow.SignalExternalWorkflowException;
4141
import com.uber.cadence.workflow.StartChildWorkflowFailedException;
42+
import java.nio.charset.StandardCharsets;
4243
import java.util.HashMap;
4344
import java.util.Map;
4445
import java.util.Objects;
46+
import java.util.Random;
47+
import java.util.UUID;
4548
import java.util.concurrent.CancellationException;
4649
import java.util.function.BiConsumer;
4750
import java.util.function.Consumer;
@@ -95,7 +98,11 @@ Consumer<Exception> startChildWorkflow(
9598
final StartChildWorkflowExecutionDecisionAttributes attributes =
9699
new StartChildWorkflowExecutionDecisionAttributes();
97100
attributes.setWorkflowType(parameters.getWorkflowType());
98-
attributes.setWorkflowId(parameters.getWorkflowId());
101+
String workflowId = parameters.getWorkflowId();
102+
if (workflowId == null) {
103+
workflowId = randomUUID().toString();
104+
}
105+
attributes.setWorkflowId(workflowId);
99106
if (parameters.getDomain() == null) {
100107
// Could be removed as soon as server allows null for domain.
101108
attributes.setDomain(workflowContext.getDomain());
@@ -135,9 +142,7 @@ Consumer<Exception> startChildWorkflow(
135142
}
136143
attributes.setTaskList(tl);
137144
attributes.setWorkflowIdReusePolicy(parameters.getWorkflowIdReusePolicy());
138-
long initiatedEventId =
139-
decisions.startChildWorkflowExecution(
140-
attributes, workflowContext.getWorkflowExecution().getRunId());
145+
long initiatedEventId = decisions.startChildWorkflowExecution(attributes);
141146
final OpenChildWorkflowRequestInfo context =
142147
new OpenChildWorkflowRequestInfo(executionCallback);
143148
context.setCompletionHandle(callback);
@@ -155,6 +160,8 @@ Consumer<Exception> signalWorkflowExecution(
155160
} else {
156161
attributes.setDomain(parameters.getDomain());
157162
}
163+
String signalId = decisions.getAndIncrementNextId();
164+
attributes.setControl(signalId.getBytes(StandardCharsets.UTF_8));
158165
attributes.setSignalName(parameters.getSignalName());
159166
attributes.setInput(parameters.getInput());
160167
WorkflowExecution execution = new WorkflowExecution();
@@ -195,6 +202,19 @@ void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters continue
195202
workflowContext.setContinueAsNewOnCompletion(continueParameters);
196203
}
197204

205+
/** Replay safe UUID */
206+
UUID randomUUID() {
207+
WorkflowExecution workflowExecution = workflowContext.getWorkflowExecution();
208+
String runId = workflowExecution.getRunId();
209+
String id = runId + ":" + decisions.getAndIncrementNextId();
210+
byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
211+
return UUID.nameUUIDFromBytes(bytes);
212+
}
213+
214+
Random newRandom() {
215+
return new Random(randomUUID().getLeastSignificantBits());
216+
}
217+
198218
void handleChildWorkflowExecutionCanceled(HistoryEvent event) {
199219
ChildWorkflowExecutionCanceledEventAttributes attributes =
200220
event.getChildWorkflowExecutionCanceledEventAttributes();

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Optional;
45+
import java.util.Random;
4546
import java.util.Set;
47+
import java.util.UUID;
4648
import java.util.concurrent.ExecutorService;
4749
import java.util.concurrent.SynchronousQueue;
4850
import java.util.concurrent.ThreadPoolExecutor;
@@ -533,6 +535,11 @@ public int getVersion(
533535
throw new UnsupportedOperationException("not implemented");
534536
}
535537

538+
@Override
539+
public Random newRandom() {
540+
throw new UnsupportedOperationException("not implemented");
541+
}
542+
536543
@Override
537544
public Scope getMetricsScope() {
538545
throw new UnsupportedOperationException("not implemented");
@@ -542,5 +549,10 @@ public Scope getMetricsScope() {
542549
public boolean getEnableLoggingInReplay() {
543550
return false;
544551
}
552+
553+
@Override
554+
public UUID randomUUID() {
555+
throw new UnsupportedOperationException("not implemented");
556+
}
545557
}
546558
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import java.util.Map;
5555
import java.util.Objects;
5656
import java.util.Optional;
57+
import java.util.Random;
58+
import java.util.UUID;
5759
import java.util.concurrent.CancellationException;
5860
import java.util.concurrent.TimeUnit;
5961
import java.util.concurrent.atomic.AtomicReference;
@@ -427,6 +429,16 @@ public void registerQuery(
427429
});
428430
}
429431

432+
@Override
433+
public UUID randomUUID() {
434+
return context.randomUUID();
435+
}
436+
437+
@Override
438+
public Random newRandom() {
439+
return context.newRandom();
440+
}
441+
430442
public DataConverter getDataConverter() {
431443
return converter;
432444
}

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.nio.charset.StandardCharsets;
8888
import java.time.Duration;
8989
import java.util.Optional;
90+
import java.util.Random;
9091
import java.util.UUID;
9192
import java.util.concurrent.CancellationException;
9293
import java.util.concurrent.atomic.AtomicInteger;
@@ -187,6 +188,11 @@ public <R> WorkflowResult<R> executeChildWorkflow(
187188
throw new UnsupportedOperationException("not implemented");
188189
}
189190

191+
@Override
192+
public Random newRandom() {
193+
throw new UnsupportedOperationException("not implemented");
194+
}
195+
190196
@Override
191197
public Promise<Void> signalExternalWorkflow(
192198
WorkflowExecution execution, String signalName, Object[] args) {
@@ -246,6 +252,11 @@ public void registerQuery(
246252
throw new UnsupportedOperationException("not implemented");
247253
}
248254

255+
@Override
256+
public UUID randomUUID() {
257+
throw new UnsupportedOperationException("not implemented");
258+
}
259+
249260
private <T> T getReply(
250261
PollForActivityTaskResponse task,
251262
ActivityTaskHandler.Result response,

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.Collection;
5050
import java.util.List;
5151
import java.util.Optional;
52+
import java.util.Random;
53+
import java.util.UUID;
5254
import java.util.function.BiPredicate;
5355
import java.util.function.Supplier;
5456
import org.slf4j.Logger;
@@ -336,6 +338,14 @@ private static boolean isLoggingEnabledInReplay() {
336338
return getRootDecisionContext().isLoggingEnabledInReplay();
337339
}
338340

341+
public static UUID randomUUID() {
342+
return getRootDecisionContext().randomUUID();
343+
}
344+
345+
public static Random newRandom() {
346+
return getRootDecisionContext().newRandom();
347+
}
348+
339349
public static Logger getLogger(Class<?> clazz) {
340350
Logger logger = LoggerFactory.getLogger(clazz);
341351
return new ReplayAwareLogger(

0 commit comments

Comments
 (0)