Skip to content

Commit f9a6265

Browse files
authored
Added WorkflowLocal and WorkflowThreadLocal (#135)
* Added WorkflowLocal and WorkflowThreadLocal * Implemented unit test for WorkflowLocal and WorkflowThreadLocal
1 parent 9dd9c4a commit f9a6265

12 files changed

+301
-28
lines changed

src/main/java/com/uber/cadence/client/WorkflowException.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,22 @@ protected WorkflowException(
4040
private static String getMessage(
4141
String message, WorkflowExecution execution, Optional<String> workflowType) {
4242
StringBuilder result = new StringBuilder();
43-
result.append(message);
43+
if (message != null) {
44+
result.append(message);
45+
result.append(", ");
46+
}
4447
if (workflowType.isPresent()) {
45-
result.append(", WorkflowType=\"");
48+
result.append("WorkflowType=\"");
4649
result.append(workflowType.get());
50+
}
51+
if (execution != null) {
52+
if (result.length() > 0) {
53+
result.append("\", ");
54+
}
55+
result.append("WorkflowExecution=\"");
56+
result.append(execution);
4757
result.append("\"");
4858
}
49-
result.append(", WorkflowExecution=\"");
50-
result.append(execution);
5159
return result.toString();
5260
}
5361

src/main/java/com/uber/cadence/client/WorkflowServiceException.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,13 @@
1818
package com.uber.cadence.client;
1919

2020
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2122
import java.util.Optional;
2223

2324
public final class WorkflowServiceException extends WorkflowException {
2425

2526
public WorkflowServiceException(
2627
WorkflowExecution execution, Optional<String> workflowType, Throwable failure) {
27-
super(getMessage(execution, workflowType), execution, workflowType, failure);
28-
}
29-
30-
private static String getMessage(WorkflowExecution execution, Optional<String> workflowType) {
31-
StringBuilder result = new StringBuilder();
32-
if (workflowType.isPresent()) {
33-
result.append("WorkflowType=\"");
34-
result.append(workflowType.get());
35-
result.append("\", ");
36-
}
37-
result.append("WorkflowID=\"");
38-
result.append(execution.getWorkflowId());
39-
result.append("\", RunID=\"");
40-
result.append(execution.getRunId());
41-
return result.toString();
28+
super(null, execution, workflowType, CheckedExceptionWrapper.unwrap(failure));
4229
}
4330
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public WorkflowExecution startWorkflow(StartWorkflowExecutionParameters startPar
9393
} catch (WorkflowExecutionAlreadyStartedError e) {
9494
throw e;
9595
} catch (TException e) {
96-
throw new RuntimeException(e);
96+
throw CheckedExceptionWrapper.wrap(e);
9797
}
9898
WorkflowExecution execution = new WorkflowExecution();
9999
execution.setRunId(result.getRunId());
@@ -116,7 +116,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
116116
try {
117117
service.SignalWorkflowExecution(request);
118118
} catch (TException e) {
119-
throw new RuntimeException(e);
119+
throw CheckedExceptionWrapper.wrap(e);
120120
}
121121
}
122122

@@ -128,7 +128,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) {
128128
try {
129129
service.RequestCancelWorkflowExecution(request);
130130
} catch (TException e) {
131-
throw new RuntimeException(e);
131+
throw CheckedExceptionWrapper.wrap(e);
132132
}
133133
}
134134

@@ -169,7 +169,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term
169169
try {
170170
service.TerminateWorkflowExecution(request);
171171
} catch (TException e) {
172-
throw new RuntimeException(e);
172+
throw CheckedExceptionWrapper.wrap(e);
173173
}
174174
}
175175
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.Deque;
26+
import java.util.HashMap;
2627
import java.util.HashSet;
2728
import java.util.Iterator;
2829
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Optional;
2932
import java.util.Set;
3033
import java.util.concurrent.ExecutorService;
3134
import java.util.concurrent.SynchronousQueue;
@@ -51,13 +54,16 @@ private NamedRunnable(String name, Runnable runnable) {
5154
}
5255

5356
private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
54-
public static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
57+
static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
5558
private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();
5659

5760
private final Lock lock = new ReentrantLock();
5861
private final ExecutorService threadPool;
5962
private final SyncDecisionContext decisionContext;
6063
private final Deque<WorkflowThread> threads = new ArrayDeque<>(); // protected by lock
64+
// Values from RunnerLocalInternal
65+
private final Map<RunnerLocalInternal<?>, Object> runnerLocalMap = new HashMap<>();
66+
6167
private final List<WorkflowThread> threadsToAdd = Collections.synchronizedList(new ArrayList<>());
6268
private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
6369
private final Supplier<Long> clock;
@@ -128,7 +134,7 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
128134
rootWorkflowThread.start();
129135
}
130136

131-
public SyncDecisionContext getDecisionContext() {
137+
SyncDecisionContext getDecisionContext() {
132138
return decisionContext;
133139
}
134140

@@ -347,12 +353,12 @@ Lock getLock() {
347353
}
348354

349355
/** Register a promise that had failed but wasn't accessed yet. */
350-
public void registerFailedPromise(Promise promise) {
356+
void registerFailedPromise(Promise promise) {
351357
failedPromises.add(promise);
352358
}
353359

354360
/** Forget a failed promise as it was accessed. */
355-
public void forgetFailedPromise(Promise promise) {
361+
void forgetFailedPromise(Promise promise) {
356362
failedPromises.remove(promise);
357363
}
358364

@@ -368,4 +374,16 @@ private void checkWorkflowThreadOnly() {
368374
throw new Error("called from non workflow thread");
369375
}
370376
}
377+
378+
@SuppressWarnings("unchecked")
379+
<T> Optional<T> getRunnerLocal(RunnerLocalInternal<T> key) {
380+
if (!runnerLocalMap.containsKey(key)) {
381+
return Optional.empty();
382+
}
383+
return Optional.of((T) runnerLocalMap.get(key));
384+
}
385+
386+
<T> void setRunnerLocal(RunnerLocalInternal<T> key, T value) {
387+
runnerLocalMap.put(key, value);
388+
}
371389
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
import java.util.Optional;
21+
import java.util.function.Supplier;
22+
23+
public final class RunnerLocalInternal<T> {
24+
25+
public T get(Supplier<? extends T> supplier) {
26+
Optional<T> result =
27+
DeterministicRunnerImpl.currentThreadInternal().getRunner().getRunnerLocal(this);
28+
return result.orElse(supplier.get());
29+
}
30+
31+
public void set(T value) {
32+
DeterministicRunnerImpl.currentThreadInternal().getRunner().setRunnerLocal(this, value);
33+
}
34+
}

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ public void signal(String signalName, Object... input) {
9494
// TODO: Deal with signaling started workflow only, when requested
9595
// Commented out to support signaling workflows that called continue as new.
9696
// p.setRunId(execution.getRunId());
97-
genericClient.signalWorkflowExecution(p);
97+
try {
98+
genericClient.signalWorkflowExecution(p);
99+
} catch (Exception e) {
100+
throw new WorkflowServiceException(execution.get(), workflowType, e);
101+
}
98102
}
99103

100104
private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
@@ -125,6 +129,8 @@ private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
125129
WorkflowExecution execution =
126130
new WorkflowExecution().setWorkflowId(p.getWorkflowId()).setRunId(e.getRunId());
127131
throw new DuplicateWorkflowException(execution, workflowType.get(), e.getMessage());
132+
} catch (Exception e) {
133+
throw new WorkflowServiceException(execution.get(), workflowType, e);
128134
}
129135
return execution.get();
130136
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThread.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.uber.cadence.internal.sync.DeterministicRunnerImpl.currentThreadInternal;
2121

2222
import com.uber.cadence.workflow.CancellationScope;
23+
import java.util.Optional;
2324
import java.util.concurrent.CancellationException;
2425
import java.util.function.Supplier;
2526

@@ -108,4 +109,8 @@ static <R> void exit(R value) {
108109
}
109110

110111
<R> void exitThread(R value);
112+
113+
<T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value);
114+
115+
<T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key);
111116
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import com.uber.cadence.workflow.Promise;
2121
import java.io.PrintWriter;
2222
import java.io.StringWriter;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.Optional;
2326
import java.util.concurrent.CancellationException;
2427
import java.util.concurrent.ExecutionException;
2528
import java.util.concurrent.ExecutorService;
@@ -136,6 +139,7 @@ public void setName(String name) {
136139
private final RunnableWrapper task;
137140
private Thread thread;
138141
private Future<?> taskFuture;
142+
private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
139143

140144
/**
141145
* If not 0 then thread is blocked on a sleep (or on an operation with a timeout). The value is
@@ -363,6 +367,20 @@ public <R> void exitThread(R value) {
363367
throw new DestroyWorkflowThreadError("exit");
364368
}
365369

370+
@Override
371+
public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
372+
threadLocalMap.put(key, value);
373+
}
374+
375+
@SuppressWarnings("unchecked")
376+
@Override
377+
public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
378+
if (!threadLocalMap.containsKey(key)) {
379+
return Optional.empty();
380+
}
381+
return Optional.of((T) threadLocalMap.get(key));
382+
}
383+
366384
/** @return stack trace of the coroutine thread */
367385
@Override
368386
public String getStackTrace() {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
import java.util.Optional;
21+
import java.util.function.Supplier;
22+
23+
public final class WorkflowThreadLocalInternal<T> {
24+
25+
public T get(Supplier<? extends T> supplier) {
26+
Optional<T> result = DeterministicRunnerImpl.currentThreadInternal().getThreadLocal(this);
27+
return result.orElse(supplier.get());
28+
}
29+
30+
public void set(T value) {
31+
DeterministicRunnerImpl.currentThreadInternal().setThreadLocal(this, value);
32+
}
33+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow;
19+
20+
import com.uber.cadence.internal.sync.RunnerLocalInternal;
21+
import java.util.Objects;
22+
import java.util.function.Supplier;
23+
24+
/**
25+
* A value that is local to a single workflow execution. So it can act as a <i>global</i> variable
26+
* for the workflow code. For example the {@code Workflow.isSignalled()} static method returns the
27+
* correct value even if there are multiple workflows executing on the same machine simultaneously.
28+
* It would be invalid if the {@code signaled} was a {@code static boolean} variable.
29+
*
30+
* <pre>{@code
31+
* public class Workflow {
32+
*
33+
* private static final WorkflowLocal<Boolean> signaled = WorkflowLocal.withInitial(() -> false);
34+
*
35+
* public static boolean isSignalled() {
36+
* return signaled.get();
37+
* }
38+
*
39+
* public void signal() {
40+
* signaled.set(true);
41+
* }
42+
* }
43+
* }</pre>
44+
*
45+
* @see WorkflowThreadLocal for thread local that can be used inside workflow code.
46+
*/
47+
public final class WorkflowLocal<T> {
48+
49+
private final RunnerLocalInternal<T> impl = new RunnerLocalInternal<>();
50+
private Supplier<? extends T> supplier;
51+
52+
private WorkflowLocal(Supplier<? extends T> supplier) {
53+
this.supplier = Objects.requireNonNull(supplier);
54+
}
55+
56+
public WorkflowLocal() {
57+
this.supplier = () -> null;
58+
}
59+
60+
public static <S> WorkflowLocal<S> withInitial(Supplier<? extends S> supplier) {
61+
return new WorkflowLocal<>(supplier);
62+
}
63+
64+
public T get() {
65+
return impl.get(supplier);
66+
}
67+
68+
public void set(T value) {
69+
impl.set(value);
70+
}
71+
}

0 commit comments

Comments
 (0)