Skip to content

Commit 5b7e82c

Browse files
authored
Added retry options to ActivityInfo. Added ActivityInfo tests. (#2622)
1 parent 0dd76e0 commit 5b7e82c

File tree

4 files changed

+201
-5
lines changed

4 files changed

+201
-5
lines changed

temporal-sdk/src/main/java/io/temporal/activity/ActivityInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.temporal.api.common.v1.Payloads;
44
import io.temporal.common.Experimental;
55
import io.temporal.common.Priority;
6+
import io.temporal.common.RetryOptions;
67
import java.time.Duration;
78
import java.util.Optional;
89
import javax.annotation.Nonnull;
@@ -131,4 +132,9 @@ public interface ActivityInfo {
131132
@Experimental
132133
@Nonnull
133134
Priority getPriority();
135+
136+
/**
137+
* @return Retry options for the Activity Execution.
138+
*/
139+
RetryOptions getRetryOptions();
134140
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import io.temporal.api.common.v1.Payloads;
66
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
77
import io.temporal.common.Priority;
8+
import io.temporal.common.RetryOptions;
89
import io.temporal.internal.common.ProtoConverters;
910
import io.temporal.internal.common.ProtobufTimeUtils;
11+
import io.temporal.internal.common.RetryOptionsUtils;
1012
import io.temporal.workflow.Functions;
1113
import java.time.Duration;
1214
import java.util.Base64;
@@ -136,11 +138,17 @@ public boolean isLocal() {
136138
return local;
137139
}
138140

141+
@Nonnull
139142
@Override
140143
public Priority getPriority() {
141144
return ProtoConverters.fromProto(response.getPriority());
142145
}
143146

147+
@Override
148+
public RetryOptions getRetryOptions() {
149+
return RetryOptionsUtils.toRetryOptions(response.getRetryPolicy());
150+
}
151+
144152
@Override
145153
public Functions.Proc getCompletionHandle() {
146154
return completionHandle;
@@ -165,7 +173,7 @@ public Optional<Header> getHeader() {
165173
@Override
166174
public String toString() {
167175
return "WorkflowInfo{"
168-
+ ", workflowId="
176+
+ "workflowId="
169177
+ getWorkflowId()
170178
+ ", runId="
171179
+ getRunId()
@@ -191,11 +199,17 @@ public String toString() {
191199
+ getWorkflowType()
192200
+ ", namespace="
193201
+ getNamespace()
202+
+ ", activityTaskQueue="
203+
+ getActivityTaskQueue()
194204
+ ", attempt="
195205
+ getAttempt()
196206
+ ", isLocal="
197207
+ isLocal()
198-
+ "taskToken="
208+
+ ", priority="
209+
+ getPriority()
210+
+ ", retryOptions="
211+
+ getRetryOptions()
212+
+ ", taskToken="
199213
+ Base64.getEncoder().encodeToString(getTaskToken())
200214
+ '}';
201215
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package io.temporal.activity;
2+
3+
import io.temporal.common.RetryOptions;
4+
import io.temporal.testing.internal.SDKTestOptions;
5+
import io.temporal.testing.internal.SDKTestWorkflowRule;
6+
import io.temporal.workflow.Workflow;
7+
import io.temporal.workflow.WorkflowInterface;
8+
import io.temporal.workflow.WorkflowMethod;
9+
import java.time.Duration;
10+
import org.junit.Assert;
11+
import org.junit.Rule;
12+
import org.junit.Test;
13+
14+
public class ActivityInfoTest {
15+
public static class SerializedActivityInfo {
16+
public byte[] taskToken;
17+
public String workflowId;
18+
public String runId;
19+
public String activityId;
20+
public String activityType;
21+
public Duration scheduleToCloseTimeout;
22+
public Duration startToCloseTimeout;
23+
public Duration heartbeatTimeout;
24+
public String workflowType;
25+
public String namespace;
26+
public String activityTaskQueue;
27+
public boolean isLocal;
28+
public int priorityKey;
29+
public boolean hasRetryOptions;
30+
public Duration retryInitialInterval;
31+
public double retryBackoffCoefficient;
32+
public int retryMaximumAttempts;
33+
public Duration retryMaximumInterval;
34+
public String[] retryDoNotRetry;
35+
}
36+
37+
private static final RetryOptions RETRY_OPTIONS =
38+
RetryOptions.newBuilder()
39+
.setInitialInterval(Duration.ofSeconds(2))
40+
.setBackoffCoefficient(1.5)
41+
.setMaximumAttempts(5)
42+
.setMaximumInterval(Duration.ofSeconds(6))
43+
.setDoNotRetry("DoNotRetryThisType")
44+
.build();
45+
private static final ActivityOptions ACTIVITY_OPTIONS =
46+
ActivityOptions.newBuilder(SDKTestOptions.newActivityOptions())
47+
.setRetryOptions(RETRY_OPTIONS)
48+
.build();
49+
private static final LocalActivityOptions LOCAL_ACTIVITY_OPTIONS =
50+
LocalActivityOptions.newBuilder(SDKTestOptions.newLocalActivityOptions())
51+
.setRetryOptions(RETRY_OPTIONS)
52+
.build();
53+
54+
@Rule
55+
public SDKTestWorkflowRule testWorkflowRule =
56+
SDKTestWorkflowRule.newBuilder()
57+
.setWorkflowTypes(ActivityInfoWorkflowImpl.class)
58+
.setActivityImplementations(new ActivityInfoActivityImpl())
59+
.build();
60+
61+
@Test
62+
public void getActivityInfo() {
63+
ActivityInfoWorkflow workflow = testWorkflowRule.newWorkflowStub(ActivityInfoWorkflow.class);
64+
SerializedActivityInfo info = workflow.getActivityInfo(false);
65+
// Unpredictable values
66+
Assert.assertTrue(info.taskToken.length > 0);
67+
Assert.assertFalse(info.workflowId.isEmpty());
68+
Assert.assertFalse(info.runId.isEmpty());
69+
Assert.assertFalse(info.activityId.isEmpty());
70+
// Predictable values
71+
Assert.assertEquals(ActivityInfoActivity.ACTIVITY_NAME, info.activityType);
72+
Assert.assertEquals(ACTIVITY_OPTIONS.getScheduleToCloseTimeout(), info.scheduleToCloseTimeout);
73+
Assert.assertEquals(ACTIVITY_OPTIONS.getStartToCloseTimeout(), info.startToCloseTimeout);
74+
Assert.assertEquals(ACTIVITY_OPTIONS.getHeartbeatTimeout(), info.heartbeatTimeout);
75+
Assert.assertEquals(ActivityInfoWorkflow.class.getSimpleName(), info.workflowType);
76+
Assert.assertEquals(SDKTestWorkflowRule.NAMESPACE, info.namespace);
77+
Assert.assertEquals(testWorkflowRule.getTaskQueue(), info.activityTaskQueue);
78+
Assert.assertFalse(info.isLocal);
79+
Assert.assertEquals(0, info.priorityKey);
80+
// Server controls retry options so we can't make assertions what they are,
81+
// but they should be present
82+
Assert.assertTrue(info.hasRetryOptions);
83+
}
84+
85+
@Test
86+
public void getLocalActivityInfo() {
87+
ActivityInfoWorkflow workflow = testWorkflowRule.newWorkflowStub(ActivityInfoWorkflow.class);
88+
SerializedActivityInfo info = workflow.getActivityInfo(true);
89+
// Unpredictable values
90+
Assert.assertFalse(info.workflowId.isEmpty());
91+
Assert.assertFalse(info.runId.isEmpty());
92+
Assert.assertFalse(info.activityId.isEmpty());
93+
// Predictable values
94+
Assert.assertEquals(0, info.taskToken.length);
95+
Assert.assertEquals(ActivityInfoActivity.ACTIVITY_NAME, info.activityType);
96+
Assert.assertEquals(
97+
LOCAL_ACTIVITY_OPTIONS.getScheduleToCloseTimeout(), info.scheduleToCloseTimeout);
98+
Assert.assertTrue(info.startToCloseTimeout.isZero());
99+
Assert.assertTrue(info.heartbeatTimeout.isZero());
100+
Assert.assertEquals(ActivityInfoWorkflow.class.getSimpleName(), info.workflowType);
101+
Assert.assertEquals(SDKTestWorkflowRule.NAMESPACE, info.namespace);
102+
Assert.assertEquals(testWorkflowRule.getTaskQueue(), info.activityTaskQueue);
103+
Assert.assertTrue(info.isLocal);
104+
Assert.assertEquals(0, info.priorityKey);
105+
Assert.assertTrue(info.hasRetryOptions);
106+
Assert.assertEquals(RETRY_OPTIONS.getInitialInterval(), info.retryInitialInterval);
107+
Assert.assertEquals(RETRY_OPTIONS.getBackoffCoefficient(), info.retryBackoffCoefficient, 0);
108+
Assert.assertEquals(RETRY_OPTIONS.getMaximumAttempts(), info.retryMaximumAttempts);
109+
Assert.assertEquals(RETRY_OPTIONS.getMaximumInterval(), info.retryMaximumInterval);
110+
Assert.assertArrayEquals(RETRY_OPTIONS.getDoNotRetry(), info.retryDoNotRetry);
111+
}
112+
113+
@WorkflowInterface
114+
public interface ActivityInfoWorkflow {
115+
@WorkflowMethod
116+
SerializedActivityInfo getActivityInfo(boolean isLocal);
117+
}
118+
119+
public static class ActivityInfoWorkflowImpl implements ActivityInfoWorkflow {
120+
private final ActivityInfoActivity activity =
121+
Workflow.newActivityStub(ActivityInfoActivity.class, ACTIVITY_OPTIONS);
122+
private final ActivityInfoActivity localActivity =
123+
Workflow.newLocalActivityStub(ActivityInfoActivity.class, LOCAL_ACTIVITY_OPTIONS);
124+
125+
@Override
126+
public SerializedActivityInfo getActivityInfo(boolean isLocal) {
127+
if (isLocal) {
128+
return localActivity.getActivityInfo();
129+
} else {
130+
return activity.getActivityInfo();
131+
}
132+
}
133+
}
134+
135+
@ActivityInterface
136+
public interface ActivityInfoActivity {
137+
public static final String ACTIVITY_NAME = "ActivityName_getActivityInfo";
138+
139+
@ActivityMethod(name = ACTIVITY_NAME)
140+
SerializedActivityInfo getActivityInfo();
141+
}
142+
143+
public static class ActivityInfoActivityImpl implements ActivityInfoActivity {
144+
@Override
145+
public SerializedActivityInfo getActivityInfo() {
146+
ActivityInfo info = Activity.getExecutionContext().getInfo();
147+
SerializedActivityInfo serialized = new SerializedActivityInfo();
148+
serialized.taskToken = info.getTaskToken();
149+
serialized.workflowId = info.getWorkflowId();
150+
serialized.runId = info.getRunId();
151+
serialized.activityId = info.getActivityId();
152+
serialized.activityType = info.getActivityType();
153+
serialized.scheduleToCloseTimeout = info.getScheduleToCloseTimeout();
154+
serialized.startToCloseTimeout = info.getStartToCloseTimeout();
155+
serialized.heartbeatTimeout = info.getHeartbeatTimeout();
156+
serialized.workflowType = info.getWorkflowType();
157+
serialized.namespace = info.getNamespace();
158+
serialized.activityTaskQueue = info.getActivityTaskQueue();
159+
serialized.isLocal = info.isLocal();
160+
serialized.priorityKey = info.getPriority().getPriorityKey();
161+
if (info.getRetryOptions() != null) {
162+
serialized.hasRetryOptions = true;
163+
serialized.retryInitialInterval = info.getRetryOptions().getInitialInterval();
164+
serialized.retryBackoffCoefficient = info.getRetryOptions().getBackoffCoefficient();
165+
serialized.retryMaximumAttempts = info.getRetryOptions().getMaximumAttempts();
166+
serialized.retryMaximumInterval = info.getRetryOptions().getMaximumInterval();
167+
if (info.getRetryOptions().getDoNotRetry() != null) {
168+
serialized.retryDoNotRetry = info.getRetryOptions().getDoNotRetry();
169+
}
170+
}
171+
return serialized;
172+
}
173+
}
174+
}

temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestOptions.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,16 @@ public static LocalActivityOptions newLocalActivityOptions20sScheduleToClose() {
4444
.build();
4545
}
4646

47-
public static ActivityOptions newActivityOptionsForTaskQueue(String taskQueue) {
47+
public static ActivityOptions newActivityOptions() {
4848
if (DEBUGGER_TIMEOUTS) {
4949
return ActivityOptions.newBuilder()
50-
.setTaskQueue(taskQueue)
5150
.setScheduleToCloseTimeout(Duration.ofSeconds(1000))
5251
.setHeartbeatTimeout(Duration.ofSeconds(1000))
5352
.setScheduleToStartTimeout(Duration.ofSeconds(1000))
5453
.setStartToCloseTimeout(Duration.ofSeconds(10000))
5554
.build();
5655
} else {
5756
return ActivityOptions.newBuilder()
58-
.setTaskQueue(taskQueue)
5957
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
6058
.setHeartbeatTimeout(Duration.ofSeconds(5))
6159
.setScheduleToStartTimeout(Duration.ofSeconds(5))
@@ -64,6 +62,10 @@ public static ActivityOptions newActivityOptionsForTaskQueue(String taskQueue) {
6462
}
6563
}
6664

65+
public static ActivityOptions newActivityOptionsForTaskQueue(String taskQueue) {
66+
return ActivityOptions.newBuilder(newActivityOptions()).setTaskQueue(taskQueue).build();
67+
}
68+
6769
public static LocalActivityOptions newLocalActivityOptions() {
6870
if (DEBUGGER_TIMEOUTS) {
6971
return LocalActivityOptions.newBuilder()

0 commit comments

Comments
 (0)