Skip to content

Commit 98b2e78

Browse files
Disallow continue as new in update handlers (#2167)
Disallow continue as new in an update handler
1 parent 4871168 commit 98b2e78

File tree

8 files changed

+166
-399
lines changed

8 files changed

+166
-399
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.statemachines;
22+
23+
/**
24+
* Thrown when {@link io.temporal.workflow.Workflow#continueAsNew} is called from an unsupported
25+
* location.
26+
*
27+
* <p>The reason this class extends Error is for application workflow code to not catch it by
28+
* mistake. The default behavior of the SDK is to block workflow execution while Error is thrown.
29+
*/
30+
public class UnsupportedContinueAsNewRequest extends Error {
31+
public UnsupportedContinueAsNewRequest(String message) {
32+
super(message);
33+
}
34+
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.failure.CanceledFailure;
3030
import io.temporal.failure.TemporalFailure;
3131
import io.temporal.internal.replay.ReplayWorkflowContext;
32+
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
3233
import io.temporal.internal.worker.WorkflowExecutionException;
3334
import io.temporal.worker.WorkflowImplementationOptions;
3435
import io.temporal.workflow.Workflow;
@@ -123,6 +124,9 @@ public Optional<Payloads> handleExecuteUpdate(
123124
io.temporal.api.common.v1.Header header) {
124125
try {
125126
return context.handleExecuteUpdate(updateName, input, eventId, new Header(header));
127+
} catch (UnsupportedContinueAsNewRequest e) {
128+
// Re-throw to fail the workflow task
129+
throw e;
126130
} catch (Throwable e) {
127131
applyWorkflowFailurePolicyAndRethrow(e);
128132
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.temporal.internal.common.NonIdempotentHandle;
4646
import io.temporal.internal.common.SearchAttributesUtil;
4747
import io.temporal.internal.logging.ReplayAwareLogger;
48+
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
4849
import io.temporal.serviceclient.CheckedExceptionWrapper;
4950
import io.temporal.workflow.*;
5051
import io.temporal.workflow.Functions.Func;
@@ -576,6 +577,7 @@ public static <R> R retry(
576577
public static void continueAsNew(
577578
@Nullable String workflowType, @Nullable ContinueAsNewOptions options, Object[] args) {
578579
assertNotReadOnly("continue as new");
580+
assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
579581
getWorkflowOutboundInterceptor()
580582
.continueAsNew(
581583
new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
@@ -588,6 +590,7 @@ public static void continueAsNew(
588590
Object[] args,
589591
WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
590592
assertNotReadOnly("continue as new");
593+
assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
591594
outboundCallsInterceptor.continueAsNew(
592595
new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
593596
workflowType, options, args, Header.empty()));
@@ -760,6 +763,12 @@ static void assertNotReadOnly(String action) {
760763
}
761764
}
762765

766+
static void assertNotInUpdateHandler(String message) {
767+
if (getCurrentUpdateInfo().isPresent()) {
768+
throw new UnsupportedContinueAsNewRequest(message);
769+
}
770+
}
771+
763772
private static WorkflowThread getWorkflowThread() {
764773
return DeterministicRunnerImpl.currentThreadInternal();
765774
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.updateTest;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertThrows;
25+
26+
import io.temporal.api.common.v1.WorkflowExecution;
27+
import io.temporal.client.*;
28+
import io.temporal.failure.ApplicationFailure;
29+
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
30+
import io.temporal.testing.internal.SDKTestOptions;
31+
import io.temporal.testing.internal.SDKTestWorkflowRule;
32+
import io.temporal.worker.WorkflowImplementationOptions;
33+
import io.temporal.workflow.CompletablePromise;
34+
import io.temporal.workflow.Workflow;
35+
import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate;
36+
import java.time.Duration;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.UUID;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
43+
public class UpdateContinueAsNewInHandlerTest {
44+
@Rule
45+
public SDKTestWorkflowRule testWorkflowRule =
46+
SDKTestWorkflowRule.newBuilder()
47+
.setWorkflowTypes(
48+
WorkflowImplementationOptions.newBuilder()
49+
.setFailWorkflowExceptionTypes(UnsupportedContinueAsNewRequest.class)
50+
.build(),
51+
TestUpdateWorkflowImpl.class)
52+
.build();
53+
54+
@Test
55+
public void continueAsNewInUpdateHandler() {
56+
String workflowId = UUID.randomUUID().toString();
57+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
58+
WorkflowOptions options =
59+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
60+
.setWorkflowId(workflowId)
61+
.build();
62+
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
63+
64+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
65+
WorkflowStub stub = WorkflowStub.fromTyped(workflow);
66+
stub.startUpdate(
67+
UpdateOptions.newBuilder(String.class)
68+
.setUpdateName("update")
69+
.setWaitForStage(WorkflowUpdateStage.ACCEPTED)
70+
.build());
71+
WorkflowFailedException e =
72+
assertThrows(WorkflowFailedException.class, () -> stub.getResult(String.class));
73+
assertEquals(
74+
"io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest",
75+
((ApplicationFailure) e.getCause()).getType());
76+
}
77+
78+
public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
79+
String state = "initial";
80+
List<String> updates = new ArrayList<>();
81+
CompletablePromise<Void> promise = Workflow.newPromise();
82+
83+
@Override
84+
public String execute() {
85+
promise.get();
86+
return "";
87+
}
88+
89+
@Override
90+
public String getState() {
91+
return state;
92+
}
93+
94+
@Override
95+
public String update(Integer index, String value) {
96+
// Sleep to make sure the update can be accepted before trying to continueAsNew
97+
Workflow.sleep(Duration.ofSeconds(1));
98+
// This should throw UnsupportedContinueAsNewRequest
99+
Workflow.continueAsNew();
100+
return "";
101+
}
102+
103+
@Override
104+
public void updateValidator(Integer index, String value) {}
105+
106+
@Override
107+
public void complete() {
108+
promise.complete(null);
109+
}
110+
111+
@Override
112+
public void completeValidator() {}
113+
}
114+
}

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewNonDeterminism.java

Lines changed: 0 additions & 120 deletions
This file was deleted.

0 commit comments

Comments
 (0)