Skip to content

Commit 0e059ab

Browse files
authored
Added WorkflowInterceptor (#137)
* Switched continueAsNew to ContinueAsNewOptions * Added Workflow.continueAsNew * Updated version in README * Added WorkflowInterceptor interface * Migrated SyncDecisionContext to implement WorkflowInterceptor * Fully migrated WorkflowInternal to use WorkflowInterceptor interface * Threaded WorkflowInterceptor from Worker options * Renamed WorkflowInterceptor.signalWorkflow to signalExternalWorkflow. Added tracing intercetor * Added unit testing of the interceptor
1 parent cb780da commit 0e059ab

38 files changed

+1104
-317
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Add *cadence-client* as a dependency to your *pom.xml*:
2929
<dependency>
3030
<groupId>com.uber</groupId>
3131
<artifactId>cadence-client</artifactId>
32-
<version>0.1.0</version>
32+
<version>0.2.0-SPANPSHOT</version>
3333
</dependency>
3434

3535
# Overview

src/main/java/com/uber/cadence/activity/ActivityOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package com.uber.cadence.activity;
1919

20-
import static com.uber.cadence.internal.common.InternalUtils.roundUpToSeconds;
20+
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
2121

2222
import com.uber.cadence.common.RetryOptions;
2323
import java.time.Duration;

src/main/java/com/uber/cadence/client/WorkflowOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package com.uber.cadence.client;
1919

20-
import static com.uber.cadence.internal.common.InternalUtils.roundUpToSeconds;
20+
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
2121

2222
import com.uber.cadence.ChildPolicy;
2323
import com.uber.cadence.WorkflowIdReusePolicy;

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@
1919

2020
import com.uber.cadence.workflow.WorkflowMethod;
2121
import java.lang.reflect.Method;
22-
import java.time.Duration;
2322

2423
/** Utility functions shared by the implementation code. */
2524
public final class InternalUtils {
2625

27-
public static final float SECOND = 1000f;
28-
2926
/**
3027
* Used to construct default name of an activity or workflow type from a method it implements.
3128
*
@@ -67,29 +64,6 @@ public static Method getWorkflowMethod(Class<?> workflowInterface) {
6764
return result;
6865
}
6966

70-
/**
71-
* Convert milliseconds to seconds rounding up. Used by timers to ensure that they never fire
72-
* earlier than requested.
73-
*/
74-
public static Duration roundUpToSeconds(Duration duration, Duration defaultValue) {
75-
if (duration == null) {
76-
return defaultValue;
77-
}
78-
return roundUpToSeconds(duration);
79-
}
80-
81-
/**
82-
* Round durations to seconds rounding up. As all timeouts and timers resolution is in seconds
83-
* ensures that nothing times out or fires before the requested time.
84-
*/
85-
public static Duration roundUpToSeconds(Duration duration) {
86-
if (duration == null) {
87-
return Duration.ZERO;
88-
}
89-
Duration result = Duration.ofMillis((long) (Math.ceil(duration.toMillis() / SECOND) * SECOND));
90-
return result;
91-
}
92-
9367
/** Prohibit instantiation */
9468
private InternalUtils() {}
9569
}

src/main/java/com/uber/cadence/internal/common/OptionsUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
public final class OptionsUtils {
2424

2525
public static final Duration DEFAULT_TASK_START_TO_CLOSE_TIMEOUT = Duration.ofSeconds(10);
26+
public static final float SECOND = 1000f;
2627

2728
/** Merges value from annotation and option. Option value takes precedence. */
2829
public static <G> G merge(G annotation, G options, Class<G> type) {
@@ -51,6 +52,29 @@ public static Duration merge(long aSeconds, Duration o) {
5152
return aSeconds == 0 ? null : Duration.ofSeconds(aSeconds);
5253
}
5354

55+
/**
56+
* Convert milliseconds to seconds rounding up. Used by timers to ensure that they never fire
57+
* earlier than requested.
58+
*/
59+
public static Duration roundUpToSeconds(Duration duration, Duration defaultValue) {
60+
if (duration == null) {
61+
return defaultValue;
62+
}
63+
return roundUpToSeconds(duration);
64+
}
65+
66+
/**
67+
* Round durations to seconds rounding up. As all timeouts and timers resolution is in seconds
68+
* ensures that nothing times out or fires before the requested time.
69+
*/
70+
public static Duration roundUpToSeconds(Duration duration) {
71+
if (duration == null) {
72+
return Duration.ZERO;
73+
}
74+
Duration result = Duration.ofMillis((long) (Math.ceil(duration.toMillis() / SECOND) * SECOND));
75+
return result;
76+
}
77+
5478
/** Prohibits instantiation. */
5579
private OptionsUtils() {}
5680
}

src/main/java/com/uber/cadence/internal/replay/ContinueAsNewWorkflowExecutionParameters.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,15 @@ public final class ContinueAsNewWorkflowExecutionParameters {
2727
private String taskList;
2828
private int taskStartToCloseTimeoutSeconds;
2929
private ChildPolicy childPolicy;
30+
private String workflowType;
3031

31-
public ContinueAsNewWorkflowExecutionParameters() {}
32+
public void setWorkflowType(String workflowType) {
33+
this.workflowType = workflowType;
34+
}
35+
36+
public String getWorkflowType() {
37+
return workflowType;
38+
}
3239

3340
public int getExecutionStartToCloseTimeoutSeconds() {
3441
return executionStartToCloseTimeoutSeconds;

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.WorkflowType;
22+
import com.uber.cadence.workflow.Promise;
2223
import java.time.Duration;
2324
import java.util.function.BiConsumer;
2425
import java.util.function.Consumer;
@@ -87,7 +88,7 @@ Consumer<Exception> startChildWorkflow(
8788
Consumer<Exception> signalWorkflowExecution(
8889
SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback);
8990

90-
void requestCancelWorkflowExecution(WorkflowExecution execution);
91+
Promise<Void> requestCancelWorkflowExecution(WorkflowExecution execution);
9192

9293
void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters parameters);
9394

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import com.uber.cadence.WorkflowExecution;
2525
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2626
import com.uber.cadence.WorkflowType;
27+
import com.uber.cadence.workflow.Promise;
28+
import com.uber.cadence.workflow.Workflow;
2729
import java.time.Duration;
2830
import java.util.function.BiConsumer;
2931
import java.util.function.Consumer;
3032

31-
final class DecisionContextImpl implements DecisionContext, HistoryEventHandler {
33+
public final class DecisionContextImpl implements DecisionContext, HistoryEventHandler {
3234

3335
private final ActivityDecisionContext activityClient;
3436

@@ -135,8 +137,10 @@ public Consumer<Exception> signalWorkflowExecution(
135137
}
136138

137139
@Override
138-
public void requestCancelWorkflowExecution(WorkflowExecution execution) {
140+
public Promise<Void> requestCancelWorkflowExecution(WorkflowExecution execution) {
139141
workflowClient.requestCancelWorkflowExecution(execution);
142+
// TODO: Make promise return success or failure of the cancellation request.
143+
return Workflow.newPromise(null);
140144
}
141145

142146
@Override

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.uber.cadence.TimerCanceledEventAttributes;
5050
import com.uber.cadence.TimerStartedEventAttributes;
5151
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
52+
import com.uber.cadence.WorkflowType;
5253
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
5354
import com.uber.cadence.internal.worker.WorkflowExecutionException;
5455
import java.nio.charset.StandardCharsets;
@@ -335,8 +336,13 @@ void continueAsNewWorkflowExecution(ContinueAsNewWorkflowExecutionParameters con
335336
task.getHistory().getEvents().get(0).getWorkflowExecutionStartedEventAttributes();
336337
ContinueAsNewWorkflowExecutionDecisionAttributes attributes =
337338
new ContinueAsNewWorkflowExecutionDecisionAttributes();
338-
attributes.setWorkflowType(task.getWorkflowType());
339339
attributes.setInput(continueParameters.getInput());
340+
String workflowType = continueParameters.getWorkflowType();
341+
if (workflowType != null && !workflowType.isEmpty()) {
342+
attributes.setWorkflowType(new WorkflowType().setName(workflowType));
343+
} else {
344+
attributes.setWorkflowType(task.getWorkflowType());
345+
}
340346
int executionStartToClose = continueParameters.getExecutionStartToCloseTimeoutSeconds();
341347
if (executionStartToClose == 0) {
342348
executionStartToClose = startedEvent.getExecutionStartToCloseTimeoutSeconds();

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.uber.cadence.TimerStartedEventAttributes;
2525
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
2626
import com.uber.cadence.WorkflowQuery;
27-
import com.uber.cadence.internal.common.InternalUtils;
27+
import com.uber.cadence.internal.common.OptionsUtils;
2828
import com.uber.cadence.internal.worker.WorkflowExecutionException;
2929
import com.uber.cadence.workflow.Functions;
3030
import java.time.Duration;
@@ -278,7 +278,7 @@ private void updateTimers() {
278278
// Round up to the nearest second as we don't want to deliver a timer
279279
// earlier than requested.
280280
long delaySeconds =
281-
InternalUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
281+
OptionsUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
282282
if (timerCancellationHandler != null) {
283283
timerCancellationHandler.accept(null);
284284
timerCancellationHandler = null;

0 commit comments

Comments
 (0)