Skip to content

Commit 875f679

Browse files
committed
port over changeset from our fork
1 parent 26d092d commit 875f679

22 files changed

+342
-270
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ dependencies {
5353
errorproneJavac('com.google.errorprone:javac:9+181-r4173-1')
5454
errorprone('com.google.errorprone:error_prone_core:2.3.4')
5555

56-
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5'
56+
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.30'
5757
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
5858
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
5959
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
@@ -62,6 +62,7 @@ dependencies {
6262
compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0'
6363
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
6464
compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
65+
compile group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: '1.1.0'
6566

6667
testCompile group: 'junit', name: 'junit', version: '4.12'
6768
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'

src/main/java/com/uber/cadence/client/WorkflowOptions.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import com.uber.cadence.common.MethodRetry;
3131
import com.uber.cadence.common.RetryOptions;
3232
import com.uber.cadence.context.ContextPropagator;
33+
import com.uber.cadence.context.OpenTelemetryContextPropagator;
3334
import com.uber.cadence.internal.common.OptionsUtils;
3435
import com.uber.cadence.workflow.WorkflowMethod;
3536
import java.time.Duration;
37+
import java.util.ArrayList;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.Objects;
@@ -66,6 +68,7 @@ public static WorkflowOptions merge(
6668
.setMemo(o.getMemo())
6769
.setSearchAttributes(o.getSearchAttributes())
6870
.setContextPropagators(o.getContextPropagators())
71+
.setDefaultContextPropagators(o.useDefaultContextPropagators)
6972
.validateBuildWithDefaults();
7073
}
7174

@@ -91,6 +94,8 @@ public static final class Builder {
9194

9295
private List<ContextPropagator> contextPropagators;
9396

97+
private Boolean useDefaultContextPropagators;
98+
9499
public Builder() {}
95100

96101
public Builder(WorkflowOptions o) {
@@ -107,6 +112,7 @@ public Builder(WorkflowOptions o) {
107112
this.memo = o.memo;
108113
this.searchAttributes = o.searchAttributes;
109114
this.contextPropagators = o.contextPropagators;
115+
this.useDefaultContextPropagators = o.useDefaultContextPropagators;
110116
}
111117

112118
/**
@@ -214,6 +220,13 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
214220
return this;
215221
}
216222

223+
/** Specifies that the default context propagators should not be used. */
224+
public Builder setDefaultContextPropagators(Boolean useDefaultContextPropagators) {
225+
this.useDefaultContextPropagators =
226+
(useDefaultContextPropagators == null || useDefaultContextPropagators);
227+
return this;
228+
}
229+
217230
public WorkflowOptions build() {
218231
return new WorkflowOptions(
219232
workflowId,
@@ -225,7 +238,8 @@ public WorkflowOptions build() {
225238
cronSchedule,
226239
memo,
227240
searchAttributes,
228-
contextPropagators);
241+
contextPropagators,
242+
useDefaultContextPropagators);
229243
}
230244

231245
/**
@@ -261,6 +275,20 @@ public WorkflowOptions validateBuildWithDefaults() {
261275
cron.validate();
262276
}
263277

278+
if (useDefaultContextPropagators == null || useDefaultContextPropagators) {
279+
// Add OpenTelemetry propagator if not already present.
280+
if (contextPropagators != null) {
281+
contextPropagators = new ArrayList(contextPropagators);
282+
} else {
283+
contextPropagators = new ArrayList<>();
284+
}
285+
286+
OpenTelemetryContextPropagator otelPropagator = new OpenTelemetryContextPropagator();
287+
if (!contextPropagators.contains(otelPropagator)) {
288+
contextPropagators.add(otelPropagator);
289+
}
290+
}
291+
264292
return new WorkflowOptions(
265293
workflowId,
266294
policy,
@@ -272,7 +300,8 @@ public WorkflowOptions validateBuildWithDefaults() {
272300
cronSchedule,
273301
memo,
274302
searchAttributes,
275-
contextPropagators);
303+
contextPropagators,
304+
useDefaultContextPropagators);
276305
}
277306
}
278307

@@ -296,6 +325,8 @@ public WorkflowOptions validateBuildWithDefaults() {
296325

297326
private List<ContextPropagator> contextPropagators;
298327

328+
private Boolean useDefaultContextPropagators;
329+
299330
private WorkflowOptions(
300331
String workflowId,
301332
WorkflowIdReusePolicy workflowIdReusePolicy,
@@ -306,7 +337,8 @@ private WorkflowOptions(
306337
String cronSchedule,
307338
Map<String, Object> memo,
308339
Map<String, Object> searchAttributes,
309-
List<ContextPropagator> contextPropagators) {
340+
List<ContextPropagator> contextPropagators,
341+
Boolean useDefaultContextPropagators) {
310342
this.workflowId = workflowId;
311343
this.workflowIdReusePolicy = workflowIdReusePolicy;
312344
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
@@ -317,6 +349,7 @@ private WorkflowOptions(
317349
this.memo = memo;
318350
this.searchAttributes = searchAttributes;
319351
this.contextPropagators = contextPropagators;
352+
this.useDefaultContextPropagators = useDefaultContextPropagators;
320353
}
321354

322355
public String getWorkflowId() {
@@ -373,7 +406,9 @@ public boolean equals(Object o) {
373406
&& Objects.equals(cronSchedule, that.cronSchedule)
374407
&& Objects.equals(memo, that.memo)
375408
&& Objects.equals(searchAttributes, that.searchAttributes)
376-
&& Objects.equals(contextPropagators, that.contextPropagators);
409+
&& Objects.equals(contextPropagators, that.contextPropagators)
410+
&& (useDefaultContextPropagators == null || useDefaultContextPropagators)
411+
== (that.useDefaultContextPropagators == null || that.useDefaultContextPropagators);
377412
}
378413

379414
@Override
@@ -388,7 +423,8 @@ public int hashCode() {
388423
cronSchedule,
389424
memo,
390425
searchAttributes,
391-
contextPropagators);
426+
contextPropagators,
427+
useDefaultContextPropagators);
392428
}
393429

394430
@Override
@@ -418,6 +454,8 @@ public String toString() {
418454
+ searchAttributes
419455
+ ", contextPropagators='"
420456
+ contextPropagators
457+
+ ", useDefaultContextPropagators='"
458+
+ useDefaultContextPropagators
421459
+ '\''
422460
+ '}';
423461
}

src/main/java/com/uber/cadence/context/ContextPropagator.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
* Context Propagators are used to propagate information from workflow to activity, workflow to
2424
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
2525
*
26+
* <p>It is important to note that all threads share one ContextPropagator instance, so your
27+
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
28+
*
2629
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
2730
* with a given prefix along the code path looks like this:
2831
*
@@ -136,4 +139,31 @@ public interface ContextPropagator {
136139

137140
/** Sets the current context */
138141
void setCurrentContext(Object context);
142+
143+
/**
144+
* This is a lifecycle method, called after the context has been propagated to the
145+
* workflow/activity thread but the workflow/activity has not yet started.
146+
*/
147+
default void setUp() {
148+
// No-op
149+
}
150+
151+
/**
152+
* This is a lifecycle method, called after the workflow/activity has completed. If the method
153+
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
154+
* {@link #onError(Throwable)} will have already been called.
155+
*/
156+
default void finish() {
157+
// No-op
158+
}
159+
160+
/**
161+
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
162+
* exception. {@link #finish()} is called after this method.
163+
*
164+
* @param t The unhandled exception that caused the workflow/activity to terminate
165+
*/
166+
default void onError(Throwable t) {
167+
// No-op
168+
}
139169
}

src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.function.Supplier;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

28-
/** This class holds the current set of context propagators */
30+
/** This class holds the current set of context propagators. */
2931
public class ContextThreadLocal {
3032

33+
private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);
34+
3135
private static WorkflowThreadLocal<List<ContextPropagator>> contextPropagators =
3236
WorkflowThreadLocal.withInitial(
3337
new Supplier<List<ContextPropagator>>() {
@@ -37,7 +41,7 @@ public List<ContextPropagator> get() {
3741
}
3842
});
3943

40-
/** Sets the list of context propagators for the thread */
44+
/** Sets the list of context propagators for the thread. */
4145
public static void setContextPropagators(List<ContextPropagator> propagators) {
4246
if (propagators == null || propagators.isEmpty()) {
4347
return;
@@ -57,6 +61,11 @@ public static Map<String, Object> getCurrentContextForPropagation() {
5761
return contextData;
5862
}
5963

64+
/**
65+
* Injects the context data into the thread for each configured context propagator.
66+
*
67+
* @param contextData The context data received from the server.
68+
*/
6069
public static void propagateContextToCurrentThread(Map<String, Object> contextData) {
6170
if (contextData == null || contextData.isEmpty()) {
6271
return;
@@ -67,4 +76,44 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
6776
}
6877
}
6978
}
79+
80+
/** Calls {@link ContextPropagator#setUp()} for each propagator. */
81+
public static void setUpContextPropagators() {
82+
for (ContextPropagator propagator : contextPropagators.get()) {
83+
try {
84+
propagator.setUp();
85+
} catch (Throwable t) {
86+
// Don't let an error in one propagator block the others
87+
log.error("Error calling setUp() on a contextpropagator", t);
88+
}
89+
}
90+
}
91+
92+
/**
93+
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator.
94+
*
95+
* @param t The Throwable that caused the workflow/activity to finish.
96+
*/
97+
public static void onErrorContextPropagators(Throwable t) {
98+
for (ContextPropagator propagator : contextPropagators.get()) {
99+
try {
100+
propagator.onError(t);
101+
} catch (Throwable t1) {
102+
// Don't let an error in one propagator block the others
103+
log.error("Error calling onError() on a contextpropagator", t1);
104+
}
105+
}
106+
}
107+
108+
/** Calls {@link ContextPropagator#finish()} for each propagator. */
109+
public static void finishContextPropagators() {
110+
for (ContextPropagator propagator : contextPropagators.get()) {
111+
try {
112+
propagator.finish();
113+
} catch (Throwable t) {
114+
// Don't let an error in one propagator block the others
115+
log.error("Error calling finish() on a contextpropagator", t);
116+
}
117+
}
118+
}
70119
}

src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.stream.Collectors;
2627

2728
final class WorkflowContext {
2829

@@ -166,7 +167,18 @@ Map<String, Object> getPropagatedContexts() {
166167

167168
Map<String, Object> contextData = new HashMap<>();
168169
for (ContextPropagator propagator : contextPropagators) {
169-
contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
170+
// Only send the context propagator the fields that belong to them
171+
// Change the map from MyPropagator:foo -> bar to foo -> bar
172+
Map<String, byte[]> filteredData =
173+
headerData
174+
.entrySet()
175+
.stream()
176+
.filter(e -> e.getKey().startsWith(propagator.getName()))
177+
.collect(
178+
Collectors.toMap(
179+
e -> e.getKey().substring(propagator.getName().length() + 1),
180+
Map.Entry::getValue));
181+
contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
170182
}
171183

172184
return contextData;

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.function.Consumer;
7373
import java.util.function.Function;
7474
import java.util.function.Supplier;
75+
import java.util.stream.Collectors;
7576
import org.slf4j.Logger;
7677
import org.slf4j.LoggerFactory;
7778

@@ -449,7 +450,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
449450
}
450451
Map<String, byte[]> result = new HashMap<>();
451452
for (ContextPropagator propagator : contextPropagators) {
452-
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
453+
// Get the serialized context from the propagator
454+
Map<String, byte[]> serializedContext =
455+
propagator.serializeContext(propagator.getCurrentContext());
456+
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
457+
Map<String, byte[]> namespacedSerializedContext =
458+
serializedContext
459+
.entrySet()
460+
.stream()
461+
.collect(
462+
Collectors.toMap(
463+
e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
464+
result.putAll(namespacedSerializedContext);
453465
}
454466
return result;
455467
}

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@
3939
import java.lang.reflect.Type;
4040
import java.time.Duration;
4141
import java.util.Objects;
42-
import java.util.concurrent.Executors;
43-
import java.util.concurrent.ScheduledExecutorService;
44-
import java.util.concurrent.ThreadPoolExecutor;
45-
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.*;
4643
import java.util.function.Consumer;
4744
import java.util.function.Function;
4845

@@ -59,6 +56,7 @@ public class SyncWorkflowWorker
5956
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
6057
private SuspendableWorker ldaWorker;
6158
private POJOActivityTaskHandler ldaTaskHandler;
59+
private final IWorkflowService service;
6260

6361
public SyncWorkflowWorker(
6462
IWorkflowService service,
@@ -74,6 +72,7 @@ public SyncWorkflowWorker(
7472
ThreadPoolExecutor workflowThreadPool) {
7573
Objects.requireNonNull(workflowThreadPool);
7674
this.dataConverter = workflowOptions.getDataConverter();
75+
this.service = service;
7776

7877
factory =
7978
new POJOWorkflowImplementationFactory(
@@ -252,4 +251,8 @@ public <R> R queryWorkflowExecution(
252251
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
253252
workflowWorker.accept(pollForDecisionTaskResponse);
254253
}
254+
255+
public CompletableFuture<Boolean> isHealthy() {
256+
return service.isHealthy();
257+
}
255258
}

0 commit comments

Comments
 (0)