Skip to content

Commit d310594

Browse files
Add support for activity reset (#2546)
Add support for activity reset
1 parent 4acf674 commit d310594

File tree

5 files changed

+146
-3
lines changed

5 files changed

+146
-3
lines changed

temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.temporal.client;
22

33
import io.temporal.activity.ActivityInfo;
4+
import io.temporal.common.Experimental;
45

56
/***
67
* Indicates that the activity was paused by the user.
78
*
89
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
910
*/
11+
@Experimental
1012
public final class ActivityPausedException extends ActivityCompletionException {
1113
public ActivityPausedException(ActivityInfo info) {
1214
super(info);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.temporal.client;
2+
3+
import io.temporal.activity.ActivityInfo;
4+
import io.temporal.common.Experimental;
5+
6+
/***
7+
* Indicates that the activity attempt was reset by the user.
8+
*
9+
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
10+
*/
11+
@Experimental
12+
public final class ActivityResetException extends ActivityCompletionException {
13+
public ActivityResetException(ActivityInfo info) {
14+
super(info);
15+
}
16+
17+
public ActivityResetException() {
18+
super();
19+
}
20+
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ private void sendHeartbeatRequest(Object details) {
226226
metricsScope);
227227
if (status.getCancelRequested()) {
228228
lastException = new ActivityCanceledException(info);
229+
} else if (status.getActivityReset()) {
230+
lastException = new ActivityResetException(info);
229231
} else if (status.getActivityPaused()) {
230232
lastException = new ActivityPausedException(info);
231233
} else {

temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import io.temporal.api.common.v1.Payloads;
1212
import io.temporal.api.common.v1.WorkflowExecution;
1313
import io.temporal.api.workflowservice.v1.*;
14-
import io.temporal.client.ActivityCanceledException;
15-
import io.temporal.client.ActivityCompletionFailureException;
16-
import io.temporal.client.ActivityNotExistsException;
14+
import io.temporal.client.*;
1715
import io.temporal.common.converter.DataConverter;
1816
import io.temporal.failure.CanceledFailure;
1917
import io.temporal.internal.client.ActivityClientHelper;
@@ -190,6 +188,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
190188
metricsScope);
191189
if (status.getCancelRequested()) {
192190
throw new ActivityCanceledException();
191+
} else if (status.getActivityReset()) {
192+
throw new ActivityResetException();
193+
} else if (status.getActivityPaused()) {
194+
throw new ActivityPausedException();
193195
}
194196
} else {
195197
RecordActivityTaskHeartbeatByIdResponse status =
@@ -203,6 +205,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
203205
metricsScope);
204206
if (status.getCancelRequested()) {
205207
throw new ActivityCanceledException();
208+
} else if (status.getActivityReset()) {
209+
throw new ActivityResetException();
210+
} else if (status.getActivityPaused()) {
211+
throw new ActivityPausedException();
206212
}
207213
}
208214
} catch (Exception e) {
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package io.temporal.activity;
2+
3+
import static org.junit.Assume.assumeTrue;
4+
5+
import io.temporal.api.common.v1.WorkflowExecution;
6+
import io.temporal.api.workflow.v1.PendingActivityInfo;
7+
import io.temporal.api.workflowservice.v1.ResetActivityRequest;
8+
import io.temporal.client.ActivityResetException;
9+
import io.temporal.client.WorkflowStub;
10+
import io.temporal.common.converter.GlobalDataConverter;
11+
import io.temporal.testing.internal.SDKTestOptions;
12+
import io.temporal.testing.internal.SDKTestWorkflowRule;
13+
import io.temporal.workflow.Async;
14+
import io.temporal.workflow.Workflow;
15+
import io.temporal.workflow.shared.TestActivities;
16+
import io.temporal.workflow.shared.TestWorkflows;
17+
import java.time.Duration;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import org.junit.Assert;
20+
import org.junit.Rule;
21+
import org.junit.Test;
22+
23+
public class ActivityResetTest {
24+
25+
@Rule
26+
public SDKTestWorkflowRule testWorkflowRule =
27+
SDKTestWorkflowRule.newBuilder()
28+
.setWorkflowTypes(TestWorkflowImpl.class)
29+
.setActivityImplementations(new HeartBeatingActivityImpl())
30+
.build();
31+
32+
@Test
33+
public void activityReset() {
34+
assumeTrue(
35+
"Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);
36+
37+
TestWorkflows.TestWorkflowReturnString workflow =
38+
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
39+
Assert.assertEquals("I am stopped after reset", workflow.execute());
40+
Assert.assertEquals(
41+
1,
42+
WorkflowStub.fromTyped(workflow)
43+
.describe()
44+
.getRawDescription()
45+
.getPendingActivitiesCount());
46+
PendingActivityInfo activityInfo =
47+
WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
48+
Assert.assertEquals(
49+
"1",
50+
GlobalDataConverter.get()
51+
.fromPayload(
52+
activityInfo.getHeartbeatDetails().getPayloads(0), String.class, String.class));
53+
}
54+
55+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
56+
57+
private final TestActivities.TestActivity1 activities =
58+
Workflow.newActivityStub(
59+
TestActivities.TestActivity1.class,
60+
SDKTestOptions.newActivityOptions20sScheduleToClose());
61+
62+
@Override
63+
public String execute() {
64+
Async.function(activities::execute, "");
65+
Workflow.sleep(Duration.ofSeconds(1));
66+
return activities.execute("CompleteOnPause");
67+
}
68+
}
69+
70+
public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
71+
private final AtomicInteger resetCounter = new AtomicInteger(0);
72+
73+
@Override
74+
public String execute(String arg) {
75+
ActivityInfo info = Activity.getExecutionContext().getInfo();
76+
// Have the activity pause itself
77+
Activity.getExecutionContext()
78+
.getWorkflowClient()
79+
.getWorkflowServiceStubs()
80+
.blockingStub()
81+
.resetActivity(
82+
ResetActivityRequest.newBuilder()
83+
.setNamespace(info.getNamespace())
84+
.setExecution(
85+
WorkflowExecution.newBuilder()
86+
.setWorkflowId(info.getWorkflowId())
87+
.setRunId(info.getRunId())
88+
.build())
89+
.setId(info.getActivityId())
90+
.build());
91+
while (true) {
92+
try {
93+
Thread.sleep(1000);
94+
// Check if the activity has been reset, and the activity info shows we are on the 1st
95+
// attempt.
96+
if (resetCounter.get() >= 1
97+
&& Activity.getExecutionContext().getInfo().getAttempt() == 1) {
98+
return "I am stopped after reset";
99+
}
100+
// Heartbeat and verify that the correct exception is thrown
101+
Activity.getExecutionContext().heartbeat("1");
102+
} catch (ActivityResetException pe) {
103+
// Counter is incremented to track the number of resets
104+
resetCounter.addAndGet(1);
105+
// This will fail the attempt, and the activity will be retried.
106+
throw pe;
107+
} catch (InterruptedException e) {
108+
throw new RuntimeException(e);
109+
}
110+
}
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)