Skip to content

Commit 4eda239

Browse files
Add setWorkflowIdConflictPolicy (#2055)
Add setWorkflowIdConflictPolicy
1 parent 5c464e8 commit 4eda239

File tree

7 files changed

+280
-30
lines changed

7 files changed

+280
-30
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.client;
2222

2323
import com.google.common.base.Objects;
24+
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
2425
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
2526
import io.temporal.common.CronSchedule;
2627
import io.temporal.common.MethodRetry;
@@ -77,6 +78,7 @@ public static WorkflowOptions merge(
7778
.setContextPropagators(o.getContextPropagators())
7879
.setDisableEagerExecution(o.isDisableEagerExecution())
7980
.setStartDelay(o.getStartDelay())
81+
.setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy())
8082
.validateBuildWithDefaults();
8183
}
8284

@@ -110,6 +112,8 @@ public static final class Builder {
110112

111113
private Duration startDelay;
112114

115+
private WorkflowIdConflictPolicy workflowIdConflictpolicy;
116+
113117
private Builder() {}
114118

115119
private Builder(WorkflowOptions options) {
@@ -130,6 +134,7 @@ private Builder(WorkflowOptions options) {
130134
this.contextPropagators = options.contextPropagators;
131135
this.disableEagerExecution = options.disableEagerExecution;
132136
this.startDelay = options.startDelay;
137+
this.workflowIdConflictpolicy = options.workflowIdConflictpolicy;
133138
}
134139

135140
/**
@@ -145,7 +150,8 @@ public Builder setWorkflowId(String workflowId) {
145150
/**
146151
* Specifies server behavior if a completed workflow with the same id exists. Note that under no
147152
* conditions Temporal allows two workflows with the same namespace and workflow id run
148-
* simultaneously.
153+
* simultaneously. See {@line setWorkflowIdConflictPolicy} for handling a workflow id
154+
* duplication with a <b>Running</b> workflow.
149155
*
150156
* <p>Default value if not set: <b>AllowDuplicate</b>
151157
*
@@ -165,6 +171,25 @@ public Builder setWorkflowIdReusePolicy(WorkflowIdReusePolicy workflowIdReusePol
165171
return this;
166172
}
167173

174+
/**
175+
* Specifies server behavior if a <b>Running</b> workflow with the same id exists. See {@link
176+
* #setWorkflowIdReusePolicy} for handling a workflow id duplication with a <b>Closed</b>
177+
* workflow. Cannot be set when {@link #getWorkflowIdReusePolicy()} is {@link
178+
* WorkflowIdReusePolicy#WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING}.
179+
*
180+
* <ul>
181+
* <li><b>Fail</b> Don't start a new workflow; instead return {@link
182+
* WorkflowExecutionAlreadyStarted}
183+
* <li><b>UseExisting</b> Don't start a new workflow; instead return the handle for the
184+
* running workflow.
185+
* <li><b>TerminateExisting</b> Terminate the running workflow before starting a new one.
186+
* </ul>
187+
*/
188+
public Builder setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy workflowIdConflictpolicy) {
189+
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
190+
return this;
191+
}
192+
168193
/**
169194
* The time after which a workflow run is automatically terminated by Temporal service with
170195
* WORKFLOW_EXECUTION_TIMED_OUT status.
@@ -374,7 +399,8 @@ public WorkflowOptions build() {
374399
typedSearchAttributes,
375400
contextPropagators,
376401
disableEagerExecution,
377-
startDelay);
402+
startDelay,
403+
workflowIdConflictpolicy);
378404
}
379405

380406
/**
@@ -395,7 +421,8 @@ public WorkflowOptions validateBuildWithDefaults() {
395421
typedSearchAttributes,
396422
contextPropagators,
397423
disableEagerExecution,
398-
startDelay);
424+
startDelay,
425+
workflowIdConflictpolicy);
399426
}
400427
}
401428

@@ -427,6 +454,8 @@ public WorkflowOptions validateBuildWithDefaults() {
427454

428455
private final Duration startDelay;
429456

457+
private final WorkflowIdConflictPolicy workflowIdConflictpolicy;
458+
430459
private WorkflowOptions(
431460
String workflowId,
432461
WorkflowIdReusePolicy workflowIdReusePolicy,
@@ -441,7 +470,8 @@ private WorkflowOptions(
441470
SearchAttributes typedSearchAttributes,
442471
List<ContextPropagator> contextPropagators,
443472
boolean disableEagerExecution,
444-
Duration startDelay) {
473+
Duration startDelay,
474+
WorkflowIdConflictPolicy workflowIdConflictpolicy) {
445475
this.workflowId = workflowId;
446476
this.workflowIdReusePolicy = workflowIdReusePolicy;
447477
this.workflowRunTimeout = workflowRunTimeout;
@@ -456,6 +486,7 @@ private WorkflowOptions(
456486
this.contextPropagators = contextPropagators;
457487
this.disableEagerExecution = disableEagerExecution;
458488
this.startDelay = startDelay;
489+
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
459490
}
460491

461492
public String getWorkflowId() {
@@ -523,6 +554,10 @@ public boolean isDisableEagerExecution() {
523554
return startDelay;
524555
}
525556

557+
public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() {
558+
return workflowIdConflictpolicy;
559+
}
560+
526561
public Builder toBuilder() {
527562
return new Builder(this);
528563
}
@@ -545,7 +580,8 @@ public boolean equals(Object o) {
545580
&& Objects.equal(typedSearchAttributes, that.typedSearchAttributes)
546581
&& Objects.equal(contextPropagators, that.contextPropagators)
547582
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
548-
&& Objects.equal(startDelay, that.startDelay);
583+
&& Objects.equal(startDelay, that.startDelay)
584+
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy);
549585
}
550586

551587
@Override
@@ -564,7 +600,8 @@ public int hashCode() {
564600
typedSearchAttributes,
565601
contextPropagators,
566602
disableEagerExecution,
567-
startDelay);
603+
startDelay,
604+
workflowIdConflictpolicy);
568605
}
569606

570607
@Override
@@ -601,6 +638,8 @@ public String toString() {
601638
+ disableEagerExecution
602639
+ ", startDelay="
603640
+ startDelay
641+
+ ", workflowIdConflictpolicy="
642+
+ workflowIdConflictpolicy
604643
+ '}';
605644
}
606645
}

temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
103103
if (wfOptions.getCronSchedule() != null) {
104104
throw new IllegalArgumentException("Cron schedule cannot be set on scheduled workflow");
105105
}
106+
if (wfOptions.getWorkflowIdConflictPolicy() != null) {
107+
throw new IllegalArgumentException(
108+
"ID conflict policy cannot change from default for scheduled workflow");
109+
}
106110
// Validate required options
107111
if (wfOptions.getWorkflowId() == null || wfOptions.getWorkflowId().isEmpty()) {
108112
throw new IllegalArgumentException("ID required on workflow action");

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
8282
request.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
8383
}
8484

85+
if (options.getWorkflowIdConflictPolicy() != null) {
86+
request.setWorkflowIdConflictPolicy(options.getWorkflowIdConflictPolicy());
87+
}
88+
8589
String taskQueue = options.getTaskQueue();
8690
if (taskQueue != null && !taskQueue.isEmpty()) {
8791
request.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build());
@@ -138,6 +142,7 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
138142
.setWorkflowTaskTimeout(startParameters.getWorkflowTaskTimeout())
139143
.setWorkflowType(startParameters.getWorkflowType())
140144
.setWorkflowIdReusePolicy(startParameters.getWorkflowIdReusePolicy())
145+
.setWorkflowIdConflictPolicy(startParameters.getWorkflowIdConflictPolicy())
141146
.setCronSchedule(startParameters.getCronSchedule());
142147

143148
String workflowId = startParameters.getWorkflowId();
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client.functional;
22+
23+
import static org.junit.Assert.*;
24+
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
27+
import io.temporal.client.*;
28+
import io.temporal.testing.internal.SDKTestOptions;
29+
import io.temporal.testing.internal.SDKTestWorkflowRule;
30+
import io.temporal.workflow.Workflow;
31+
import io.temporal.workflow.shared.TestWorkflows;
32+
import java.util.Optional;
33+
import java.util.UUID;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
37+
public class WorkflowIdConflictPolicyTest {
38+
@Rule
39+
public SDKTestWorkflowRule testWorkflowRule =
40+
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestWorkflowImpl2.class).build();
41+
42+
@Test
43+
public void policyTerminateExisting() {
44+
String workflowId = UUID.randomUUID().toString();
45+
46+
WorkflowOptions.Builder workflowOptionsBuilder =
47+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
48+
.setWorkflowIdConflictPolicy(
49+
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)
50+
.setWorkflowId(workflowId);
51+
52+
TestWorkflows.TestSignaledWorkflow workflow1 =
53+
testWorkflowRule
54+
.getWorkflowClient()
55+
.newWorkflowStub(
56+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
57+
WorkflowStub workflowStub1 = WorkflowStub.fromTyped(workflow1);
58+
WorkflowExecution workflowExecution1 = workflowStub1.start();
59+
assertNotNull(workflowStub1.getExecution());
60+
61+
TestWorkflows.TestSignaledWorkflow workflow2 =
62+
testWorkflowRule
63+
.getWorkflowClient()
64+
.newWorkflowStub(
65+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
66+
WorkflowStub workflowStub2 = WorkflowStub.fromTyped(workflow2);
67+
workflowStub2.start();
68+
assertNotNull(workflowStub2.getExecution());
69+
70+
// WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING means that calling start with a workflow ID
71+
// that already has a running workflow will terminate the existing execution.
72+
assertNotEquals(workflowStub1.getExecution(), workflowStub2.getExecution());
73+
workflow2.signal("test");
74+
assertEquals("done", workflowStub2.getResult(String.class));
75+
assertThrows(
76+
WorkflowFailedException.class,
77+
() ->
78+
testWorkflowRule
79+
.getWorkflowClient()
80+
.newUntypedWorkflowStub(
81+
workflowExecution1,
82+
Optional.of(TestWorkflows.TestSignaledWorkflow.class.toString()))
83+
.getResult(String.class));
84+
}
85+
86+
@Test
87+
public void policyUseExisting() {
88+
String workflowId = UUID.randomUUID().toString();
89+
90+
WorkflowOptions.Builder workflowOptionsBuilder =
91+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
92+
.setWorkflowIdConflictPolicy(
93+
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING)
94+
.setWorkflowId(workflowId);
95+
96+
TestWorkflows.TestSignaledWorkflow workflow1 =
97+
testWorkflowRule
98+
.getWorkflowClient()
99+
.newWorkflowStub(
100+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
101+
WorkflowStub workflowStub1 = WorkflowStub.fromTyped(workflow1);
102+
workflowStub1.start();
103+
assertNotNull(workflowStub1.getExecution());
104+
105+
TestWorkflows.TestSignaledWorkflow workflow2 =
106+
testWorkflowRule
107+
.getWorkflowClient()
108+
.newWorkflowStub(
109+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
110+
WorkflowStub workflowStub2 = WorkflowStub.fromTyped(workflow2);
111+
workflowStub2.start();
112+
assertNotNull(workflowStub1.getExecution());
113+
114+
// WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING means that calling start with a workflow ID
115+
// that already has a running workflow will return the already running execution.
116+
assertEquals(workflowStub1.getExecution(), workflowStub2.getExecution());
117+
workflow2.signal("test");
118+
assertEquals("done", workflowStub1.getResult(String.class));
119+
assertEquals("done", workflowStub2.getResult(String.class));
120+
}
121+
122+
@Test
123+
public void policyFail() {
124+
String workflowId = UUID.randomUUID().toString();
125+
126+
WorkflowOptions.Builder workflowOptionsBuilder =
127+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
128+
.setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL)
129+
.setWorkflowId(workflowId);
130+
131+
TestWorkflows.TestSignaledWorkflow workflow1 =
132+
testWorkflowRule
133+
.getWorkflowClient()
134+
.newWorkflowStub(
135+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
136+
WorkflowStub workflowStub1 = WorkflowStub.fromTyped(workflow1);
137+
workflowStub1.start();
138+
assertNotNull(workflowStub1.getExecution());
139+
140+
TestWorkflows.TestSignaledWorkflow workflow2 =
141+
testWorkflowRule
142+
.getWorkflowClient()
143+
.newWorkflowStub(
144+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
145+
WorkflowStub workflowStub2 = WorkflowStub.fromTyped(workflow2);
146+
// WORKFLOW_ID_CONFLICT_POLICY_FAIL means that calling start with a workflow ID
147+
// that already has a running workflow will fail.
148+
assertThrows(WorkflowExecutionAlreadyStarted.class, () -> workflowStub2.start());
149+
workflow1.signal("test");
150+
assertEquals("done", workflowStub1.getResult(String.class));
151+
}
152+
153+
@Test
154+
public void policyDefault() {
155+
String workflowId = UUID.randomUUID().toString();
156+
157+
WorkflowOptions.Builder workflowOptionsBuilder =
158+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
159+
.setWorkflowId(workflowId);
160+
161+
TestWorkflows.TestSignaledWorkflow workflow1 =
162+
testWorkflowRule
163+
.getWorkflowClient()
164+
.newWorkflowStub(
165+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
166+
WorkflowStub workflowStub1 = WorkflowStub.fromTyped(workflow1);
167+
workflowStub1.start();
168+
assertNotNull(workflowStub1.getExecution());
169+
170+
TestWorkflows.TestSignaledWorkflow workflow2 =
171+
testWorkflowRule
172+
.getWorkflowClient()
173+
.newWorkflowStub(
174+
TestWorkflows.TestSignaledWorkflow.class, workflowOptionsBuilder.build());
175+
WorkflowStub workflowStub2 = WorkflowStub.fromTyped(workflow2);
176+
// Default policy is WORKFLOW_ID_CONFLICT_POLICY_FAIL
177+
assertThrows(WorkflowExecutionAlreadyStarted.class, () -> workflowStub2.start());
178+
workflow1.signal("test");
179+
assertEquals("done", workflowStub1.getResult(String.class));
180+
}
181+
182+
public static class TestWorkflowImpl2 implements TestWorkflows.TestSignaledWorkflow {
183+
boolean done = false;
184+
185+
@Override
186+
public String execute() {
187+
Workflow.await(() -> done);
188+
return "done";
189+
}
190+
191+
@Override
192+
public void signal(String arg) {
193+
done = true;
194+
}
195+
}
196+
}

0 commit comments

Comments
 (0)