Skip to content

Commit db299b9

Browse files
Adding cross domain signaling (#755)
Adding cross domain signaling.
1 parent 3d44b45 commit db299b9

14 files changed

+286
-11
lines changed

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowStubImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.workflow.CompletablePromise;
2626
import com.uber.cadence.workflow.Promise;
2727
import com.uber.cadence.workflow.SignalExternalWorkflowException;
28+
import com.uber.cadence.workflow.SignalOptions;
2829
import com.uber.cadence.workflow.Workflow;
2930
import com.uber.cadence.workflow.WorkflowInterceptor;
3031
import com.uber.cadence.workflow.WorkflowInterceptor.WorkflowResult;
@@ -113,4 +114,23 @@ public void signal(String signalName, Object... args) {
113114
throw e;
114115
}
115116
}
117+
118+
@Override
119+
public void signal(SignalOptions signalOptions, Object... args) {
120+
Promise<Void> signaled =
121+
decisionContext.signalExternalWorkflow(
122+
signalOptions.getDomain(), execution.get(), signalOptions.getSignalName(), args);
123+
if (AsyncInternal.isAsync()) {
124+
AsyncInternal.setAsyncResult(signaled);
125+
return;
126+
}
127+
try {
128+
signaled.get();
129+
} catch (SignalExternalWorkflowException e) {
130+
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
131+
// an event handling method.
132+
e.setStackTrace(Thread.currentThread().getStackTrace());
133+
throw e;
134+
}
135+
}
116136
}

src/main/java/com/uber/cadence/internal/sync/ExternalWorkflowStubImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.workflow.ExternalWorkflowStub;
2323
import com.uber.cadence.workflow.Promise;
2424
import com.uber.cadence.workflow.SignalExternalWorkflowException;
25+
import com.uber.cadence.workflow.SignalOptions;
2526
import com.uber.cadence.workflow.WorkflowInterceptor;
2627
import java.util.Objects;
2728

@@ -59,6 +60,25 @@ public void signal(String signalName, Object... args) {
5960
}
6061
}
6162

63+
@Override
64+
public void signal(SignalOptions signalOptions, Object... args) {
65+
Promise<Void> signaled =
66+
decisionContext.signalExternalWorkflow(
67+
signalOptions.getDomain(), execution, signalOptions.getSignalName(), args);
68+
if (AsyncInternal.isAsync()) {
69+
AsyncInternal.setAsyncResult(signaled);
70+
return;
71+
}
72+
try {
73+
signaled.get();
74+
} catch (SignalExternalWorkflowException e) {
75+
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
76+
// an event handling method.
77+
e.setStackTrace(Thread.currentThread().getStackTrace());
78+
throw e;
79+
}
80+
}
81+
6282
@Override
6383
public void cancel() {
6484
Promise<Void> cancelRequested = decisionContext.cancelWorkflow(execution);

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,10 +634,17 @@ public DecisionContext getContext() {
634634
@Override
635635
public Promise<Void> signalExternalWorkflow(
636636
WorkflowExecution execution, String signalName, Object[] args) {
637+
return this.signalExternalWorkflow(null, execution, signalName, args);
638+
}
639+
640+
@Override
641+
public Promise<Void> signalExternalWorkflow(
642+
String domain, WorkflowExecution execution, String signalName, Object[] args) {
637643
SignalExternalWorkflowParameters parameters = new SignalExternalWorkflowParameters();
638644
parameters.setSignalName(signalName);
639645
parameters.setWorkflowId(execution.getWorkflowId());
640646
parameters.setRunId(execution.getRunId());
647+
parameters.setDomain(domain);
641648
byte[] input = getDataConverter().toData(args);
642649
parameters.setInput(input);
643650
CompletablePromise<Void> result = Workflow.newPromise();

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ public Random newRandom() {
191191
throw new UnsupportedOperationException("not implemented");
192192
}
193193

194+
@Override
195+
public Promise<Void> signalExternalWorkflow(
196+
String domain, WorkflowExecution execution, String signalName, Object[] args) {
197+
throw new UnsupportedOperationException("not implemented");
198+
}
199+
194200
@Override
195201
public Promise<Void> signalExternalWorkflow(
196202
WorkflowExecution execution, String signalName, Object[] args) {

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,23 @@ public final class TestWorkflowEnvironmentInternal implements TestWorkflowEnviro
121121
private final WorkflowServiceWrapper service;
122122
private final WorkerFactory workerFactory;
123123

124-
public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
124+
public TestWorkflowEnvironmentInternal(
125+
WorkflowServiceWrapper workflowServiceWrapper, TestEnvironmentOptions options) {
125126
if (options == null) {
126127
this.testEnvironmentOptions = new TestEnvironmentOptions.Builder().build();
127128
} else {
128129
this.testEnvironmentOptions = options;
129130
}
130-
service = new WorkflowServiceWrapper();
131-
service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
131+
132+
if (workflowServiceWrapper == null) {
133+
this.service = new WorkflowServiceWrapper();
134+
} else {
135+
this.service = workflowServiceWrapper;
136+
}
137+
138+
this.service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
132139
WorkflowClient client =
133-
WorkflowClient.newInstance(service, testEnvironmentOptions.getWorkflowClientOptions());
140+
WorkflowClient.newInstance(this.service, testEnvironmentOptions.getWorkflowClientOptions());
134141
workerFactory =
135142
WorkerFactory.newInstance(client, testEnvironmentOptions.getWorkerFactoryOptions());
136143
}
@@ -253,11 +260,11 @@ public WorkerFactory getWorkerFactory() {
253260
return workerFactory;
254261
}
255262

256-
private static class WorkflowServiceWrapper implements IWorkflowService {
263+
public static class WorkflowServiceWrapper implements IWorkflowService {
257264

258265
private final TestWorkflowService impl;
259266

260-
private WorkflowServiceWrapper() {
267+
public WorkflowServiceWrapper() {
261268
impl = new TestWorkflowService();
262269
}
263270

src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,17 @@
9393
public interface TestWorkflowEnvironment {
9494

9595
static TestWorkflowEnvironment newInstance() {
96-
return new TestWorkflowEnvironmentInternal(new TestEnvironmentOptions.Builder().build());
96+
return new TestWorkflowEnvironmentInternal(null, new TestEnvironmentOptions.Builder().build());
9797
}
9898

9999
static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
100-
return new TestWorkflowEnvironmentInternal(options);
100+
return new TestWorkflowEnvironmentInternal(null, options);
101+
}
102+
103+
static TestWorkflowEnvironment newInstance(
104+
TestWorkflowEnvironmentInternal.WorkflowServiceWrapper workflowService,
105+
TestEnvironmentOptions options) {
106+
return new TestWorkflowEnvironmentInternal(workflowService, options);
101107
}
102108

103109
/**

src/main/java/com/uber/cadence/workflow/ChildWorkflowStub.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,6 @@ public interface ChildWorkflowStub {
4444
<R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object... args);
4545

4646
void signal(String signalName, Object... args);
47+
48+
void signal(SignalOptions signalOptions, Object... args);
4749
}

src/main/java/com/uber/cadence/workflow/ExternalWorkflowStub.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,7 @@ public interface ExternalWorkflowStub {
3232

3333
void signal(String signalName, Object... args);
3434

35+
void signal(SignalOptions signalOptions, Object... args);
36+
3537
void cancel();
3638
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.workflow;
17+
18+
public class SignalOptions {
19+
20+
private String domain;
21+
private String signalName;
22+
23+
private SignalOptions() {}
24+
25+
public static SignalOptions.Builder newBuilder() {
26+
return new Builder();
27+
}
28+
29+
public static class Builder {
30+
private SignalOptions signalOptions = new SignalOptions();
31+
32+
public SignalOptions.Builder setDomain(String domain) {
33+
signalOptions.setDomain(domain);
34+
return this;
35+
}
36+
37+
public SignalOptions.Builder setSignalName(String signalName) {
38+
signalOptions.setSignalName(signalName);
39+
return this;
40+
}
41+
42+
public SignalOptions build() {
43+
if (signalOptions.getSignalName() == null) {
44+
throw new IllegalArgumentException("Signal name must be provided");
45+
}
46+
47+
return signalOptions;
48+
}
49+
}
50+
51+
public SignalOptions setDomain(String domain) {
52+
this.domain = domain;
53+
return this;
54+
}
55+
56+
public String getDomain() {
57+
return this.domain;
58+
}
59+
60+
public SignalOptions setSignalName(String signalName) {
61+
this.signalName = signalName;
62+
return this;
63+
}
64+
65+
public String getSignalName() {
66+
return this.signalName;
67+
}
68+
}

src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ <R> WorkflowResult<R> executeChildWorkflow(
7777
Promise<Void> signalExternalWorkflow(
7878
WorkflowExecution execution, String signalName, Object[] args);
7979

80+
Promise<Void> signalExternalWorkflow(
81+
String domain, WorkflowExecution execution, String signalName, Object[] args);
82+
8083
Promise<Void> cancelWorkflow(WorkflowExecution execution);
8184

8285
void sleep(Duration duration);

0 commit comments

Comments
 (0)