Skip to content

Commit a7ad8dd

Browse files
authored
Added ability to signal workflow from within a workflow (#107)
* Implemented workflow to workflow signalling * Switched callback signature from Throwable to Exception. Added signal unit tests * Fixed signal cancellation * Updated license rule * Updated license generation
1 parent 6545f59 commit a7ad8dd

21 files changed

+546
-180
lines changed

build.gradle

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ buildscript {
55
}
66

77
plugins {
8-
id "com.github.hierynomus.license" version "0.14.0"
8+
id 'net.minecrell.licenser' version '0.3'
99
}
1010

1111
repositories {
@@ -39,18 +39,20 @@ dependencies {
3939

4040
license {
4141
header rootProject.file('license-header.txt')
42-
mapping {
43-
java = 'SLASHSTAR_STYLE'
44-
}
45-
include 'src/**/*.java'
46-
strictCheck true
42+
exclude 'com/uber/cadence/*.java' // generated code
4743
}
4844

4945
compileJava {
5046
options.encoding = 'UTF-8'
5147
options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation"
5248
}
5349

50+
license {
51+
header = project.file('license-header.txt')
52+
exclude '**/com/uber/cadence/*.java'
53+
}
54+
55+
5456
compileTestJava {
5557
options.encoding = 'UTF-8'
5658
options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation"

src/main/java/com/uber/cadence/client/DuplicateWorkflowException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* <ul>There is successfully closed workflow with the same ID and the {@link WorkflowOptions#getWorkflowIdReusePolicy()} is
2828
* {@link com.uber.cadence.WorkflowIdReusePolicy#AllowDuplicateFailedOnly}.</ul>
2929
* <ul>Method annotated with {@link com.uber.cadence.workflow.WorkflowMethod} is called <i>more than once</i>
30-
* on a stub created through {@link com.uber.cadence.workflow.Workflow#newChildWorkflowStub(Class)}
30+
* on a stub created through {@link com.uber.cadence.workflow.Workflow#newWorkflowStub(Class)}
3131
* and the {@link WorkflowOptions#getWorkflowIdReusePolicy()} is
3232
* {@link com.uber.cadence.WorkflowIdReusePolicy#AllowDuplicate}</ul>
3333
* </li>

src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,19 @@
3535

3636
final class ActivityDecisionContext {
3737

38-
private final class ActivityCancellationHandler implements Consumer<Throwable> {
38+
private final class ActivityCancellationHandler implements Consumer<Exception> {
3939

4040
private final String activityId;
4141

42-
private final BiConsumer<byte[], RuntimeException> callback;
42+
private final BiConsumer<byte[], Exception> callback;
4343

44-
private ActivityCancellationHandler(String activityId, BiConsumer<byte[], RuntimeException> callaback) {
44+
private ActivityCancellationHandler(String activityId, BiConsumer<byte[], Exception> callaback) {
4545
this.activityId = activityId;
4646
this.callback = callaback;
4747
}
4848

4949
@Override
50-
public void accept(Throwable cause) {
50+
public void accept(Exception cause) {
5151
decisions.requestCancelActivityTask(activityId, () -> {
5252
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
5353
if (scheduled == null) {
@@ -66,7 +66,7 @@ public void accept(Throwable cause) {
6666
this.decisions = decisions;
6767
}
6868

69-
Consumer<Throwable> scheduleActivityTask(ExecuteActivityParameters parameters, BiConsumer<byte[], RuntimeException> callback) {
69+
Consumer<Exception> scheduleActivityTask(ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
7070
final OpenRequestInfo<byte[], ActivityType> context = new OpenRequestInfo<>(parameters.getActivityType());
7171
final ScheduleActivityTaskDecisionAttributes attributes = new ScheduleActivityTaskDecisionAttributes();
7272
attributes.setActivityType(parameters.getActivityType());
@@ -106,7 +106,7 @@ void handleActivityTaskCanceled(HistoryEvent event) {
106106
CancellationException e = new CancellationException();
107107
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
108108
if (scheduled != null) {
109-
BiConsumer<byte[], RuntimeException> completionHandle = scheduled.getCompletionCallback();
109+
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
110110
// It is OK to fail with subclass of CancellationException when cancellation requested.
111111
// It allows passing information about cancellation (details in this case) to the surrounding doCatch block
112112
completionHandle.accept(null, e);
@@ -121,7 +121,7 @@ void handleActivityTaskCompleted(HistoryEvent event) {
121121
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
122122
if (scheduled != null) {
123123
byte[] result = attributes.getResult();
124-
BiConsumer<byte[], RuntimeException> completionHandle = scheduled.getCompletionCallback();
124+
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
125125
completionHandle.accept(result, null);
126126
}
127127
}
@@ -137,7 +137,7 @@ void handleActivityTaskFailed(HistoryEvent event) {
137137
byte[] details = attributes.getDetails();
138138
ActivityTaskFailedException failure = new ActivityTaskFailedException(event.getEventId(),
139139
scheduled.getUserContext(), activityId, reason, details);
140-
BiConsumer<byte[], RuntimeException> completionHandle = scheduled.getCompletionCallback();
140+
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
141141
completionHandle.accept(null, failure);
142142
}
143143
}
@@ -153,7 +153,7 @@ void handleActivityTaskTimedOut(HistoryEvent event) {
153153
byte[] details = attributes.getDetails();
154154
ActivityTaskTimeoutException failure = new ActivityTaskTimeoutException(event.getEventId(),
155155
scheduled.getUserContext(), activityId, timeoutType, details);
156-
BiConsumer<byte[], RuntimeException> completionHandle = scheduled.getCompletionCallback();
156+
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
157157
completionHandle.accept(null, failure);
158158
}
159159
}

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*/
3838
final class ClockDecisionContext {
3939

40-
private final class TimerCancellationHandler implements Consumer<Throwable> {
40+
private final class TimerCancellationHandler implements Consumer<Exception> {
4141

4242
private final String timerId;
4343

@@ -46,10 +46,10 @@ private final class TimerCancellationHandler implements Consumer<Throwable> {
4646
}
4747

4848
@Override
49-
public void accept(Throwable reason) {
49+
public void accept(Exception reason) {
5050
decisions.cancelTimer(timerId, () -> {
5151
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
52-
BiConsumer<?, RuntimeException> context = scheduled.getCompletionCallback();
52+
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
5353
CancellationException exception = new CancellationException("Cancelled by request");
5454
exception.initCause(reason);
5555
context.accept(null, exception);
@@ -83,13 +83,13 @@ boolean isReplaying() {
8383
return replaying;
8484
}
8585

86-
Consumer<Throwable> createTimer(long delaySeconds, Consumer<Throwable> callback) {
86+
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
8787
if (delaySeconds < 0) {
8888
throw new IllegalArgumentException("Negative delaySeconds: " + delaySeconds);
8989
}
9090
if (delaySeconds == 0) {
9191
callback.accept(null);
92-
return throwable -> {};
92+
return Exception -> {};
9393
}
9494
long firingTime = currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
9595
// As the timer resolution is 1 second it doesn't really make sense to update a timer
@@ -101,24 +101,24 @@ Consumer<Throwable> createTimer(long delaySeconds, Consumer<Throwable> callback)
101101
return null;
102102
}
103103
}
104-
Consumer<Throwable> result = null;
104+
Consumer<Exception> result = null;
105105
if (!timersByFiringTime.containsKey(firingTime)) {
106106
final OpenRequestInfo<?, Long> context = new OpenRequestInfo<>(firingTime);
107107
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes();
108108
timer.setStartToFireTimeoutSeconds(delaySeconds);
109109
final String timerId = decisions.getNextId();
110110
timer.setTimerId(timerId);
111111
decisions.startTimer(timer, null);
112-
context.setCompletionHandle((ctx, throwable) -> callback.accept(null));
112+
context.setCompletionHandle((ctx, Exception) -> callback.accept(null));
113113
scheduledTimers.put(timerId, context);
114114
timersByFiringTime.put(firingTime, timerId);
115115
result = new ClockDecisionContext.TimerCancellationHandler(timerId);
116116
}
117-
SortedMap<Long, String> toCancel = timersByFiringTime.subMap(0l, firingTime);
117+
SortedMap<Long, String> toCancel = timersByFiringTime.subMap(0L, firingTime);
118118
for (String timerId : toCancel.values()) {
119119
decisions.cancelTimer(timerId, () -> {
120120
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
121-
BiConsumer<?, RuntimeException> context = scheduled.getCompletionCallback();
121+
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
122122
CancellationException exception = new CancellationException("Cancelled as next unblock time changed");
123123
context.accept(null, exception);
124124
});
@@ -131,7 +131,7 @@ void cancelAllTimers() {
131131
for (String timerId : timersByFiringTime.values()) {
132132
decisions.cancelTimer(timerId, () -> {
133133
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
134-
BiConsumer<?, RuntimeException> context = scheduled.getCompletionCallback();
134+
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
135135
CancellationException exception = new CancellationException("Cancelled as next unblock time changed");
136136
context.accept(null, exception);
137137
});
@@ -148,7 +148,7 @@ void handleTimerFired(TimerFiredEventAttributes attributes) {
148148
if (decisions.handleTimerClosed(timerId)) {
149149
OpenRequestInfo<?, Long> scheduled = scheduledTimers.remove(timerId);
150150
if (scheduled != null) {
151-
BiConsumer<?, RuntimeException> completionCallback = scheduled.getCompletionCallback();
151+
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
152152
completionCallback.accept(null, null);
153153
long firingTime = scheduled.getUserContext();
154154
timersByFiringTime.remove(firingTime);
@@ -162,7 +162,7 @@ void handleTimerCanceled(HistoryEvent event) {
162162
if (decisions.handleTimerCanceled(event)) {
163163
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
164164
if (scheduled != null) {
165-
BiConsumer<?, RuntimeException> completionCallback = scheduled.getCompletionCallback();
165+
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
166166
CancellationException exception = new CancellationException("Cancelled by request");
167167
completionCallback.accept(null, exception);
168168
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public interface DecisionContext {
6969
* @param callback Callback that is called upon activity completion or failure.
7070
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel activity task.
7171
*/
72-
Consumer<Throwable> scheduleActivityTask(ExecuteActivityParameters parameters,
73-
BiConsumer<byte[], RuntimeException> callback);
72+
Consumer<Exception> scheduleActivityTask(ExecuteActivityParameters parameters,
73+
BiConsumer<byte[], Exception> callback);
7474

7575

7676
/**
@@ -81,10 +81,10 @@ Consumer<Throwable> scheduleActivityTask(ExecuteActivityParameters parameters,
8181
* @param callback Callback that is called upon child workflow completion or failure.
8282
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel activity task.
8383
*/
84-
Consumer<Throwable> startChildWorkflow(StartChildWorkflowExecutionParameters parameters, Consumer<WorkflowExecution> executionCallback,
85-
BiConsumer<byte[], RuntimeException> callback);
84+
Consumer<Exception> startChildWorkflow(StartChildWorkflowExecutionParameters parameters, Consumer<WorkflowExecution> executionCallback,
85+
BiConsumer<byte[], Exception> callback);
8686

87-
// TODO(Cadence): Promise<Void> signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);
87+
Consumer<Exception> signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback);
8888

8989
void requestCancelWorkflowExecution(WorkflowExecution execution);
9090

@@ -118,7 +118,7 @@ Consumer<Throwable> startChildWorkflow(StartChildWorkflowExecutionParameters par
118118
* CancellationException is passed as a parameter in case of a cancellation.
119119
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel timer.
120120
*/
121-
Consumer<Throwable> createTimer(long delaySeconds, Consumer<Throwable> callback);
121+
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
122122

123123
void cancelAllTimers();
124124

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,24 @@ public Duration getExecutionStartToCloseTimeout() {
111111
}
112112

113113
@Override
114-
public Consumer<Throwable> scheduleActivityTask(ExecuteActivityParameters parameters,
115-
BiConsumer<byte[], RuntimeException> callback) {
114+
public Consumer<Exception> scheduleActivityTask(ExecuteActivityParameters parameters,
115+
BiConsumer<byte[], Exception> callback) {
116116
return activityClient.scheduleActivityTask(parameters, callback);
117117
}
118118

119119
@Override
120-
public Consumer<Throwable> startChildWorkflow(StartChildWorkflowExecutionParameters parameters,
120+
public Consumer<Exception> startChildWorkflow(StartChildWorkflowExecutionParameters parameters,
121121
Consumer<WorkflowExecution> executionCallback,
122-
BiConsumer<byte[], RuntimeException> callback) {
122+
BiConsumer<byte[], Exception> callback) {
123123
return workflowClient.startChildWorkflow(parameters, executionCallback, callback);
124124
}
125125

126+
127+
public Consumer<Exception> signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters,
128+
BiConsumer<Void, Exception> callback) {
129+
return workflowClient.signalWorkflowExecution(signalParameters, callback);
130+
}
131+
126132
@Override
127133
public void requestCancelWorkflowExecution(WorkflowExecution execution) {
128134
workflowClient.requestCancelWorkflowExecution(execution);
@@ -148,7 +154,7 @@ public boolean isReplaying() {
148154
}
149155

150156
@Override
151-
public Consumer<Throwable> createTimer(long delaySeconds, Consumer<Throwable> callback) {
157+
public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
152158
return workflowClock.createTimer(delaySeconds, callback);
153159
}
154160

@@ -240,4 +246,12 @@ public void handleTimerFired(TimerFiredEventAttributes attributes) {
240246
public void handleTimerCanceled(HistoryEvent event) {
241247
workflowClock.handleTimerCanceled(event);
242248
}
249+
250+
public void handleSignalExternalWorkflowExecutionFailed(HistoryEvent event) {
251+
workflowClient.handleSignalExternalWorkflowExecutionFailed(event);
252+
}
253+
254+
public void handleExternalWorkflowExecutionSignaled(HistoryEvent event) {
255+
workflowClient.handleExternalWorkflowExecutionSignaled(event);
256+
}
243257
}

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2121
import com.uber.cadence.internal.worker.WorkflowExecutionException;
2222

23+
import java.nio.charset.StandardCharsets;
2324
import java.util.ArrayList;
2425
import java.util.HashMap;
2526
import java.util.Iterator;
@@ -160,6 +161,24 @@ void handleRequestCancelExternalWorkflowExecutionFailed(HistoryEvent event) {
160161
decision.handleCancellationFailureEvent(event);
161162
}
162163

164+
void signalExternalWorkflowExecution(SignalExternalWorkflowExecutionDecisionAttributes signal) {
165+
DecisionId decisionId = new DecisionId(DecisionTarget.SIGNAL, new String(signal.getControl(), StandardCharsets.UTF_8));
166+
addDecision(decisionId, new SignalDecisionStateMachine(decisionId, signal));
167+
}
168+
169+
void cancelSignalExternalWorkflowExecution(String signalId, Runnable immediateCancellationCallback) {
170+
DecisionStateMachine decision = getDecision(new DecisionId(DecisionTarget.SIGNAL, signalId));
171+
decision.cancel(immediateCancellationCallback);
172+
}
173+
174+
void handleSignalExternalWorkflowExecutionInitiated(HistoryEvent event) {
175+
SignalExternalWorkflowExecutionInitiatedEventAttributes attributes = event.getSignalExternalWorkflowExecutionInitiatedEventAttributes();
176+
String signalId = new String(attributes.getControl(), StandardCharsets.UTF_8);
177+
signalInitiatedEventIdToSignalId.put(event.getEventId(), signalId);
178+
DecisionStateMachine decision = getDecision(new DecisionId(DecisionTarget.SIGNAL, signalId));
179+
decision.handleInitiatedEvent(event);
180+
}
181+
163182
public boolean handleSignalExternalWorkflowExecutionFailed(String signalId) {
164183
DecisionStateMachine decision = getDecision(new DecisionId(DecisionTarget.SIGNAL, signalId));
165184
decision.handleCompletionEvent();

src/main/java/com/uber/cadence/internal/replay/OpenRequestInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
class OpenRequestInfo<T, C> {
2222

23-
BiConsumer<T, RuntimeException> completionHandle;
23+
BiConsumer<T, Exception> completionHandle;
2424

2525
final C userContext;
2626

@@ -32,11 +32,11 @@ class OpenRequestInfo<T, C> {
3232
this.userContext = userContext;
3333
}
3434

35-
BiConsumer<T, RuntimeException> getCompletionCallback() {
35+
BiConsumer<T, Exception> getCompletionCallback() {
3636
return completionHandle;
3737
}
3838

39-
void setCompletionHandle(BiConsumer<T, RuntimeException> context) {
39+
void setCompletionHandle(BiConsumer<T, Exception> context) {
4040
this.completionHandle = context;
4141
}
4242

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ private void processEvent(HistoryEvent event, EventType eventType) throws Throwa
125125
case DecisionTaskTimedOut:
126126
// Handled in the processEvent(event)
127127
break;
128-
// case ExternalWorkflowExecutionSignaled:
129-
// workflowClient.handleExternalWorkflowExecutionSignaled(event);
130-
// break;
128+
case ExternalWorkflowExecutionSignaled:
129+
context.handleExternalWorkflowExecutionSignaled(event);
130+
break;
131131
case StartChildWorkflowExecutionFailed:
132132
context.handleStartChildWorkflowExecutionFailed(event);
133133
break;
@@ -174,9 +174,12 @@ private void processEvent(HistoryEvent event, EventType eventType) throws Throwa
174174
case TimerCanceled:
175175
context.handleTimerCanceled(event);
176176
break;
177-
// case SignalExternalWorkflowExecutionInitiated:
178-
// decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event);
179-
// break;
177+
case SignalExternalWorkflowExecutionInitiated:
178+
decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event);
179+
break;
180+
case SignalExternalWorkflowExecutionFailed:
181+
context.handleSignalExternalWorkflowExecutionFailed(event);
182+
break;
180183
case RequestCancelExternalWorkflowExecutionInitiated:
181184
decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event);
182185
break;
@@ -188,6 +191,9 @@ private void processEvent(HistoryEvent event, EventType eventType) throws Throwa
188191
break;
189192
case CancelTimerFailed:
190193
decisionsHelper.handleCancelTimerFailed(event);
194+
break;
195+
case DecisionTaskFailed:
196+
break;
191197
}
192198
}
193199

0 commit comments

Comments
 (0)