Skip to content

Commit 7f6b590

Browse files
authored
Added support for overriding task list when calling continue as new (#223)
1 parent 6df06a2 commit 7f6b590

File tree

10 files changed

+56
-52
lines changed

10 files changed

+56
-52
lines changed

src/main/java/com/uber/cadence/internal/replay/ContinueAsNewWorkflowExecutionParameters.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20-
import com.uber.cadence.ChildPolicy;
2120
import java.nio.charset.StandardCharsets;
2221

2322
public final class ContinueAsNewWorkflowExecutionParameters {
@@ -26,7 +25,6 @@ public final class ContinueAsNewWorkflowExecutionParameters {
2625
private byte[] input;
2726
private String taskList;
2827
private int taskStartToCloseTimeoutSeconds;
29-
private ChildPolicy childPolicy;
3028
private String workflowType;
3129

3230
public void setWorkflowType(String workflowType) {
@@ -85,19 +83,6 @@ public void setTaskStartToCloseTimeoutSeconds(int taskStartToCloseTimeoutSeconds
8583
this.taskStartToCloseTimeoutSeconds = taskStartToCloseTimeoutSeconds;
8684
}
8785

88-
public ChildPolicy getChildPolicy() {
89-
return childPolicy;
90-
}
91-
92-
public void setChildPolicy(ChildPolicy childPolicy) {
93-
this.childPolicy = childPolicy;
94-
}
95-
96-
public ContinueAsNewWorkflowExecutionParameters withChildPolicy(ChildPolicy childPolicy) {
97-
this.childPolicy = childPolicy;
98-
return this;
99-
}
100-
10186
public ContinueAsNewWorkflowExecutionParameters withTaskStartToCloseTimeoutSeconds(
10287
int taskStartToCloseTimeoutSeconds) {
10388
this.taskStartToCloseTimeoutSeconds = taskStartToCloseTimeoutSeconds;

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.ChildPolicy;
2021
import com.uber.cadence.WorkflowExecution;
2122
import com.uber.cadence.WorkflowType;
2223
import com.uber.cadence.converter.DataConverter;
@@ -68,6 +69,8 @@ public interface DecisionContext extends ReplayAware {
6869

6970
Duration getDecisionTaskTimeout();
7071

72+
ChildPolicy getChildPolicy();
73+
7174
/**
7275
* Used to dynamically schedule an activity for execution
7376
*

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20-
import com.uber.cadence.HistoryEvent;
21-
import com.uber.cadence.PollForDecisionTaskResponse;
22-
import com.uber.cadence.TimerFiredEventAttributes;
23-
import com.uber.cadence.WorkflowExecution;
24-
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
25-
import com.uber.cadence.WorkflowType;
20+
import com.uber.cadence.*;
2621
import com.uber.cadence.converter.DataConverter;
2722
import com.uber.cadence.internal.metrics.ReplayAwareScope;
2823
import com.uber.cadence.workflow.Functions.Func;
@@ -128,6 +123,11 @@ public Duration getDecisionTaskTimeout() {
128123
return Duration.ofSeconds(workflowContext.getDecisionTaskTimeoutSeconds());
129124
}
130125

126+
@Override
127+
public ChildPolicy getChildPolicy() {
128+
return workflowContext.getChildPolicy();
129+
}
130+
131131
@Override
132132
public String getTaskList() {
133133
return workflowContext.getTaskList();

src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.ChildPolicy;
2021
import com.uber.cadence.PollForDecisionTaskResponse;
2122
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2223

@@ -116,4 +117,8 @@ String getDomain() {
116117
private WorkflowExecutionStartedEventAttributes getWorkflowStartedEventAttributes() {
117118
return startedAttributes;
118119
}
120+
121+
public ChildPolicy getChildPolicy() {
122+
return startedAttributes.getChildPolicy();
123+
}
119124
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.uber.cadence.ChildPolicy;
2021
import com.uber.cadence.WorkflowExecution;
2122
import com.uber.cadence.WorkflowType;
2223
import com.uber.cadence.converter.DataConverter;
@@ -528,6 +529,11 @@ public Duration getDecisionTaskTimeout() {
528529
throw new UnsupportedOperationException("not implemented");
529530
}
530531

532+
@Override
533+
public ChildPolicy getChildPolicy() {
534+
return ChildPolicy.TERMINATE;
535+
}
536+
531537
@Override
532538
public Consumer<Exception> scheduleActivityTask(
533539
ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,10 +526,12 @@ public void continueAsNew(
526526
parameters.setWorkflowType(workflowType.get());
527527
}
528528
if (options.isPresent()) {
529+
ContinueAsNewOptions ops = options.get();
529530
parameters.setExecutionStartToCloseTimeoutSeconds(
530-
(int) options.get().getExecutionStartToCloseTimeout().getSeconds());
531+
(int) ops.getExecutionStartToCloseTimeout().getSeconds());
531532
parameters.setTaskStartToCloseTimeoutSeconds(
532-
(int) options.get().getTaskStartToCloseTimeout().getSeconds());
533+
(int) ops.getTaskStartToCloseTimeout().getSeconds());
534+
parameters.setTaskList(ops.getTaskList());
533535
}
534536
parameters.setInput(getDataConverter().toData(args));
535537
context.continueAsNewOnCompletion(parameters);

src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.uber.cadence.ChildPolicy;
2021
import com.uber.cadence.internal.replay.DecisionContext;
2122
import com.uber.cadence.workflow.WorkflowInfo;
2223
import java.time.Duration;
@@ -58,4 +59,9 @@ public String getTaskList() {
5859
public Duration getExecutionStartToCloseTimeout() {
5960
return context.getExecutionStartToCloseTimeout();
6061
}
62+
63+
@Override
64+
public ChildPolicy getChildPolicy() {
65+
return context.getChildPolicy();
66+
}
6167
}

src/main/java/com/uber/cadence/workflow/ContinueAsNewOptions.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.uber.cadence.workflow;
1919

20-
import com.uber.cadence.ChildPolicy;
2120
import com.uber.cadence.internal.common.OptionsUtils;
2221
import java.time.Duration;
2322

@@ -28,7 +27,6 @@ public static final class Builder {
2827
private Duration executionStartToCloseTimeout;
2928
private String taskList;
3029
private Duration taskStartToCloseTimeout;
31-
private ChildPolicy childPolicy;
3230

3331
public Builder setExecutionStartToCloseTimeout(Duration executionStartToCloseTimeout) {
3432
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
@@ -45,34 +43,23 @@ public Builder setTaskStartToCloseTimeout(Duration taskStartToCloseTimeout) {
4543
return this;
4644
}
4745

48-
public Builder setChildPolicy(ChildPolicy childPolicy) {
49-
this.childPolicy = childPolicy;
50-
return this;
51-
}
52-
5346
public ContinueAsNewOptions build() {
5447
return new ContinueAsNewOptions(
5548
OptionsUtils.roundUpToSeconds(executionStartToCloseTimeout),
5649
taskList,
57-
OptionsUtils.roundUpToSeconds(taskStartToCloseTimeout),
58-
childPolicy);
50+
OptionsUtils.roundUpToSeconds(taskStartToCloseTimeout));
5951
}
6052
}
6153

6254
private final Duration executionStartToCloseTimeout;
6355
private final String taskList;
6456
private final Duration taskStartToCloseTimeout;
65-
private final ChildPolicy childPolicy;
6657

6758
public ContinueAsNewOptions(
68-
Duration executionStartToCloseTimeout,
69-
String taskList,
70-
Duration taskStartToCloseTimeout,
71-
ChildPolicy childPolicy) {
59+
Duration executionStartToCloseTimeout, String taskList, Duration taskStartToCloseTimeout) {
7260
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
7361
this.taskList = taskList;
7462
this.taskStartToCloseTimeout = taskStartToCloseTimeout;
75-
this.childPolicy = childPolicy;
7663
}
7764

7865
public Duration getExecutionStartToCloseTimeout() {
@@ -86,8 +73,4 @@ public String getTaskList() {
8673
public Duration getTaskStartToCloseTimeout() {
8774
return taskStartToCloseTimeout;
8875
}
89-
90-
public ChildPolicy getChildPolicy() {
91-
return childPolicy;
92-
}
9376
}

src/main/java/com/uber/cadence/workflow/WorkflowInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.workflow;
1919

20+
import com.uber.cadence.ChildPolicy;
2021
import java.time.Duration;
2122

2223
public interface WorkflowInfo {
@@ -32,4 +33,6 @@ public interface WorkflowInfo {
3233
String getTaskList();
3334

3435
Duration getExecutionStartToCloseTimeout();
36+
37+
ChildPolicy getChildPolicy();
3538
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@
2626
import static org.junit.Assert.fail;
2727

2828
import com.google.common.util.concurrent.UncheckedExecutionException;
29-
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
30-
import com.uber.cadence.TimeoutType;
31-
import com.uber.cadence.WorkflowExecution;
32-
import com.uber.cadence.WorkflowIdReusePolicy;
29+
import com.uber.cadence.*;
3330
import com.uber.cadence.activity.Activity;
3431
import com.uber.cadence.activity.ActivityMethod;
3532
import com.uber.cadence.activity.ActivityOptions;
@@ -804,29 +801,42 @@ public void testDetachedScope() {
804801
public interface TestContinueAsNew {
805802

806803
@WorkflowMethod
807-
int execute(int count);
804+
int execute(int count, String continueAsNewTaskList);
808805
}
809806

810807
public static class TestContinueAsNewImpl implements TestContinueAsNew {
811808

812809
@Override
813-
public int execute(int count) {
810+
public int execute(int count, String continueAsNewTaskList) {
811+
String taskList = Workflow.getWorkflowInfo().getTaskList();
814812
if (count == 0) {
813+
assertEquals(continueAsNewTaskList, taskList);
815814
return 111;
816815
}
817-
TestContinueAsNew next = Workflow.newContinueAsNewStub(TestContinueAsNew.class, null);
818-
next.execute(count - 1);
816+
ContinueAsNewOptions options =
817+
new ContinueAsNewOptions.Builder().setTaskList(continueAsNewTaskList).build();
818+
TestContinueAsNew next = Workflow.newContinueAsNewStub(TestContinueAsNew.class, options);
819+
next.execute(count - 1, continueAsNewTaskList);
819820
throw new RuntimeException("unreachable");
820821
}
821822
}
822823

823824
@Test
824825
public void testContinueAsNew() {
826+
Worker w2;
827+
String continuedTaskList = this.taskList + "_continued";
828+
if (useExternalService) {
829+
w2 = workerFactory.newWorker(continuedTaskList);
830+
} else {
831+
w2 = testEnvironment.newWorker(continuedTaskList);
832+
}
833+
w2.registerWorkflowImplementationTypes(TestContinueAsNewImpl.class);
825834
startWorkerFor(TestContinueAsNewImpl.class);
835+
826836
TestContinueAsNew client =
827837
workflowClient.newWorkflowStub(
828-
TestContinueAsNew.class, newWorkflowOptionsBuilder(taskList).build());
829-
int result = client.execute(4);
838+
TestContinueAsNew.class, newWorkflowOptionsBuilder(this.taskList).build());
839+
int result = client.execute(4, continuedTaskList);
830840
assertEquals(111, result);
831841
tracer.setExpected("continueAsNew", "continueAsNew", "continueAsNew", "continueAsNew");
832842
}
@@ -1984,6 +1994,7 @@ public void testSignalUntyped() {
19841994
() -> {
19851995
assertEquals("initial", client.query("QueryableWorkflow::getState", String.class));
19861996
client.signal("testSignal", "Hello ");
1997+
sleep(Duration.ofMillis(500));
19871998
while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) {}
19881999
assertEquals("Hello ", client.query("QueryableWorkflow::getState", String.class));
19892000
client.signal("testSignal", "World!");

0 commit comments

Comments
 (0)