Skip to content

Commit 8b3be3b

Browse files
Don't schedule local activities on a completed workflow (#1908)
Don't schedule LA on completed workflow
1 parent 9b0dcbb commit 8b3be3b

File tree

9 files changed

+417
-7
lines changed

9 files changed

+417
-7
lines changed

.buildkite/pipeline.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ steps:
44
queue: "default"
55
docker: "*"
66
command: "./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest"
7-
timeout_in_minutes: 15
7+
timeout_in_minutes: 20
88
plugins:
99
- docker-compose#v3.8.0:
1010
run: unit-test-test-service-edge
@@ -15,7 +15,7 @@ steps:
1515
queue: "default"
1616
docker: "*"
1717
command: "./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava"
18-
timeout_in_minutes: 15
18+
timeout_in_minutes: 20
1919
plugins:
2020
- docker-compose#v3.8.0:
2121
run: unit-test-docker-jdk8
@@ -26,7 +26,7 @@ steps:
2626
queue: "default"
2727
docker: "*"
2828
command: "./gradlew --no-daemon checkLicenseMain checkLicenses spotlessCheck"
29-
timeout_in_minutes: 15
29+
timeout_in_minutes: 20
3030
plugins:
3131
- docker-compose#v3.8.0:
3232
run: jdk11

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
311311
throws InterruptedException, Throwable {
312312

313313
while (true) {
314+
// Scheduling or handling any local activities after the workflow method has returned
315+
// can result in commands being generated after the CompleteWorkflowExecution command
316+
// which the server does not allow.
317+
if (context.isWorkflowMethodCompleted()) {
318+
break;
319+
}
314320
List<ExecuteLocalActivityParameters> laRequests =
315321
workflowStateMachines.takeLocalActivityRequests();
316322
localActivityTaskCount += laRequests.size();
@@ -361,7 +367,8 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
361367
// it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
362368
// not empty - we are in trouble anyway
363369
Preconditions.checkState(
364-
workflowStateMachines.takeLocalActivityRequests().isEmpty(),
370+
workflowStateMachines.takeLocalActivityRequests().isEmpty()
371+
|| context.isWorkflowMethodCompleted(),
365372
"[BUG] Local activities requests from the last event loop were not drained "
366373
+ "and accounted in the outstanding local activities counter");
367374
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,6 @@ public void handleUpdate(
161161
// Skip validator on replay
162162
if (!callbacks.isReplaying()) {
163163
try {
164-
// TODO(https://github.com/temporalio/sdk-java/issues/1748) handleValidateUpdate
165-
// should not just be run
166-
// in a workflow thread
167164
workflowContext.setReadOnly(true);
168165
workflowProc.handleValidateUpdate(updateName, input, eventId, header);
169166
} catch (Exception e) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.testing.internal.SDKTestWorkflowRule;
26+
import io.temporal.workflow.shared.TestActivities;
27+
import io.temporal.workflow.shared.TestWorkflows;
28+
import java.util.Objects;
29+
import org.junit.Rule;
30+
import org.junit.Test;
31+
32+
public class CommandInTheLastWorkflowTaskTest {
33+
@Rule
34+
public SDKTestWorkflowRule testWorkflowRule =
35+
SDKTestWorkflowRule.newBuilder()
36+
.setWorkflowTypes(TestWorkflowImpl.class)
37+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
38+
.build();
39+
40+
@Test
41+
public void testCommandInTheLastWorkflowTask() {
42+
TestWorkflows.TestWorkflowReturnString client =
43+
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
44+
assertEquals("done", client.execute());
45+
}
46+
47+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
48+
49+
@Override
50+
public String execute() {
51+
Async.procedure(
52+
() -> {
53+
Workflow.mutableSideEffect(
54+
"id1", Integer.class, (o, n) -> Objects.equals(n, o), () -> 0);
55+
});
56+
return "done";
57+
}
58+
}
59+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.activityTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.activity.ActivityOptions;
26+
import io.temporal.testing.internal.SDKTestWorkflowRule;
27+
import io.temporal.workflow.Async;
28+
import io.temporal.workflow.Workflow;
29+
import io.temporal.workflow.shared.TestActivities;
30+
import io.temporal.workflow.shared.TestWorkflows;
31+
import java.time.Duration;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
35+
public class ActivityInTheLastWorkflowTaskTest {
36+
@Rule
37+
public SDKTestWorkflowRule testWorkflowRule =
38+
SDKTestWorkflowRule.newBuilder()
39+
.setWorkflowTypes(TestWorkflowImpl.class)
40+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
41+
.build();
42+
43+
@Test
44+
public void testActivityInTheLastWorkflowTask() {
45+
TestWorkflows.TestWorkflowReturnString client =
46+
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
47+
assertEquals("done", client.execute());
48+
}
49+
50+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
51+
52+
private final TestActivities.VariousTestActivities activities =
53+
Workflow.newActivityStub(
54+
TestActivities.VariousTestActivities.class,
55+
ActivityOptions.newBuilder()
56+
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
57+
.build());
58+
59+
@Override
60+
public String execute() {
61+
Async.procedure(activities::sleepActivity, (long) 1000, 0);
62+
return "done";
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.activityTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.activity.LocalActivityOptions;
26+
import io.temporal.testing.internal.SDKTestWorkflowRule;
27+
import io.temporal.workflow.*;
28+
import io.temporal.workflow.shared.TestActivities;
29+
import java.time.Duration;
30+
import junitparams.JUnitParamsRunner;
31+
import junitparams.Parameters;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
36+
@RunWith(JUnitParamsRunner.class)
37+
public class LocalActivityInTheLastWorkflowTaskTest {
38+
@Rule
39+
public SDKTestWorkflowRule testWorkflowRule =
40+
SDKTestWorkflowRule.newBuilder()
41+
.setWorkflowTypes(TestWorkflowImpl.class)
42+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
43+
.build();
44+
45+
@Test
46+
@Parameters({"true", "false"})
47+
public void testLocalActivityInTheLastWorkflowTask(boolean blockOnLA) {
48+
TestWorkflow client = testWorkflowRule.newWorkflowStub(TestWorkflow.class);
49+
assertEquals("done", client.execute(blockOnLA));
50+
}
51+
52+
@WorkflowInterface
53+
public interface TestWorkflow {
54+
@WorkflowMethod
55+
String execute(boolean blockOnLA);
56+
}
57+
58+
public static class TestWorkflowImpl implements TestWorkflow {
59+
60+
private final TestActivities.VariousTestActivities activities =
61+
Workflow.newLocalActivityStub(
62+
TestActivities.VariousTestActivities.class,
63+
LocalActivityOptions.newBuilder()
64+
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
65+
.build());
66+
67+
@Override
68+
public String execute(boolean blockOnLA) {
69+
if (blockOnLA) {
70+
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
71+
Async.procedure(activities::sleepActivity, (long) 1000, 0);
72+
promise.get();
73+
}
74+
Async.procedure(activities::sleepActivity, (long) 1000, 0);
75+
return "done";
76+
}
77+
}
78+
}

temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ public interface QueryableWorkflow {
177177
void mySignal(String value);
178178
}
179179

180+
@WorkflowInterface
181+
public interface SimpleWorkflowWithUpdate {
182+
183+
@WorkflowMethod
184+
String execute();
185+
186+
@UpdateMethod
187+
String update(String value);
188+
}
189+
180190
@WorkflowInterface
181191
public interface WorkflowWithUpdate {
182192

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.signalTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.activity.LocalActivityOptions;
26+
import io.temporal.client.WorkflowStub;
27+
import io.temporal.testing.internal.SDKTestWorkflowRule;
28+
import io.temporal.workflow.*;
29+
import io.temporal.workflow.shared.TestActivities;
30+
import java.time.Duration;
31+
import junitparams.JUnitParamsRunner;
32+
import junitparams.Parameters;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.runner.RunWith;
36+
37+
@RunWith(JUnitParamsRunner.class)
38+
public class SignalWithLocalActivityInTheLastWorkflowTaskTest {
39+
@Rule
40+
public SDKTestWorkflowRule testWorkflowRule =
41+
SDKTestWorkflowRule.newBuilder()
42+
.setWorkflowTypes(TestSignalWorkflowImpl.class)
43+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
44+
.build();
45+
46+
@Test
47+
@Parameters({"true", "false"})
48+
public void testSignalWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA) {
49+
TestSignaledWorkflow client = testWorkflowRule.newWorkflowStub(TestSignaledWorkflow.class);
50+
WorkflowStub.fromTyped(client)
51+
.signalWithStart("testSignal", new String[] {"signalValue"}, new Boolean[] {waitOnLA});
52+
assertEquals("done", client.execute());
53+
}
54+
55+
@WorkflowInterface
56+
public interface TestSignaledWorkflow {
57+
58+
@WorkflowMethod
59+
String execute();
60+
61+
@SignalMethod
62+
void signal(boolean waitOnLA);
63+
}
64+
65+
public static class TestSignalWorkflowImpl implements TestSignaledWorkflow {
66+
67+
private final TestActivities.VariousTestActivities activities =
68+
Workflow.newLocalActivityStub(
69+
TestActivities.VariousTestActivities.class,
70+
LocalActivityOptions.newBuilder()
71+
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
72+
.build());
73+
74+
@Override
75+
public String execute() {
76+
return "done";
77+
}
78+
79+
@Override
80+
public void signal(boolean waitOnLA) {
81+
if (waitOnLA) {
82+
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
83+
Async.procedure(activities::sleepActivity, (long) 1000, 0);
84+
promise.get();
85+
}
86+
87+
activities.sleepActivity(1000, 0);
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)