Skip to content

Commit 946a692

Browse files
authored
Added support for activity server side retry (#233)
* Added support for server side activity retry * Added activity retry to the testservice * Added Activity.getHeartbeatDetails() * Changed RetryPolicy to not require expiration when maxAttempts is present
1 parent 225245e commit 946a692

31 files changed

+1202
-175
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ test {
182182
events "passed", "skipped", "failed"
183183
exceptionFormat "full"
184184
// Uncomment the following line if you want to see test logs in gradlew run.
185-
// showStandardStreams true
185+
showStandardStreams true
186186
}
187187
}
188188

src/main/java/com/uber/cadence/activity/Activity.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.uber.cadence.serviceclient.IWorkflowService;
2323
import com.uber.cadence.workflow.ActivityException;
2424
import com.uber.cadence.workflow.ActivityTimeoutException;
25+
import java.lang.reflect.Type;
26+
import java.util.Optional;
2527
import java.util.concurrent.CancellationException;
2628

2729
/**
@@ -238,6 +240,32 @@ public static void heartbeat(Object details) throws CancellationException {
238240
ActivityInternal.recordActivityHeartbeat(details);
239241
}
240242

243+
/**
244+
* Extracts heartbeat details from the last failed attempt. This is used in combination with retry
245+
* options. An activity could be scheduled with an optional {@link
246+
* com.uber.cadence.common.RetryOptions} on {@link ActivityOptions}. If an activity failed then
247+
* the server would attempt to dispatch another activity task to retry according to the retry
248+
* options. If there was heartbeat details reported by the activity from the failed attempt, the
249+
* details would be delivered along with the activity task for the retry attempt. The activity
250+
* could extract the details by {@link #getHeartbeatDetails(Class)}() and resume from the
251+
* progress.
252+
*
253+
* @param detailsClass type of the heartbeat details
254+
*/
255+
public static <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass) {
256+
return ActivityInternal.getHeartbeatDetails(detailsClass, detailsClass);
257+
}
258+
259+
/**
260+
* Similar to {@link #getHeartbeatDetails(Class)}. Use when details is of a generic type.
261+
*
262+
* @param detailsClass type of the heartbeat details
263+
* @param detailsType type including generic information of the heartbeat details.
264+
*/
265+
public static <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsType) {
266+
return ActivityInternal.getHeartbeatDetails(detailsClass, detailsType);
267+
}
268+
241269
/**
242270
* @return an instance of the Simple Workflow Java client that is the same used by the invoked
243271
* activity worker. It can be useful if activity wants to use WorkflowClient for some

src/main/java/com/uber/cadence/activity/ActivityOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,17 @@ public ActivityOptions validateAndBuildWithDefaults() {
175175
if (heartbeatTimeout == null) {
176176
heartbeat = scheduleToClose;
177177
}
178+
RetryOptions ro = null;
179+
if (retryOptions != null) {
180+
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
181+
}
178182
return new ActivityOptions(
179183
roundUpToSeconds(heartbeat),
180184
roundUpToSeconds(scheduleToClose),
181185
roundUpToSeconds(scheduleToStart),
182186
roundUpToSeconds(startToClose),
183187
taskList,
184-
retryOptions);
188+
ro);
185189
}
186190
}
187191

src/main/java/com/uber/cadence/activity/ActivityTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,6 @@ public interface ActivityTask {
5656
Duration getStartToCloseTimeout();
5757

5858
Duration getHeartbeatTimeout();
59+
60+
byte[] getHeartbeatDetails();
5961
}

src/main/java/com/uber/cadence/common/RetryOptions.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -208,35 +208,15 @@ public RetryOptions build() {
208208

209209
/** Validates property values and builds RetryOptions with default values. */
210210
public RetryOptions validateBuildWithDefaults() {
211-
validate();
212211
double backoff = backoffCoefficient;
213212
if (backoff == 0d) {
214213
backoff = DEFAULT_BACKOFF_COEFFICIENT;
215214
}
216-
return new RetryOptions(
217-
initialInterval, backoff, expiration, maximumAttempts, maximumInterval, doNotRetry);
218-
}
219-
220-
private void validate() {
221-
if (initialInterval == null) {
222-
throw new IllegalArgumentException("required property initialInterval not set");
223-
}
224-
if (expiration == null) {
225-
throw new IllegalArgumentException("required property expiration is not set");
226-
}
227-
if (maximumInterval != null && maximumInterval.compareTo(initialInterval) == -1) {
228-
throw new IllegalStateException(
229-
"maximumInterval("
230-
+ maximumInterval
231-
+ ") cannot be smaller than initialInterval("
232-
+ initialInterval);
233-
}
234-
if (backoffCoefficient != 0d && backoffCoefficient < 1d) {
235-
throw new IllegalArgumentException("coefficient less than 1: " + backoffCoefficient);
236-
}
237-
if (maximumAttempts != 0 && maximumAttempts < 0) {
238-
throw new IllegalArgumentException("negative maximum attempts");
239-
}
215+
RetryOptions result =
216+
new RetryOptions(
217+
initialInterval, backoff, expiration, maximumAttempts, maximumInterval, doNotRetry);
218+
result.validate();
219+
return result;
240220
}
241221
}
242222

@@ -291,6 +271,10 @@ public void validate() {
291271
if (initialInterval == null) {
292272
throw new IllegalStateException("required property initialInterval not set");
293273
}
274+
if (expiration == null && maximumAttempts <= 0) {
275+
throw new IllegalArgumentException(
276+
"both MaximumAttempts and Expiration on retry policy are not set, at least one of them must be set");
277+
}
294278
if (maximumInterval != null && maximumInterval.compareTo(initialInterval) == -1) {
295279
throw new IllegalStateException(
296280
"maximumInterval("

src/main/java/com/uber/cadence/internal/common/RetryParameters.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
21+
22+
import com.uber.cadence.RetryPolicy;
23+
import com.uber.cadence.common.RetryOptions;
2024
import com.uber.m3.util.ImmutableList;
25+
import java.util.ArrayList;
2126
import java.util.List;
2227

23-
public class RetryParameters {
28+
public final class RetryParameters {
2429

2530
public int initialIntervalInSeconds;
2631
public double backoffCoefficient;
@@ -29,6 +34,29 @@ public class RetryParameters {
2934
public List<String> nonRetriableErrorReasons;
3035
public int expirationIntervalInSeconds;
3136

37+
public RetryParameters(RetryOptions retryOptions) {
38+
setBackoffCoefficient(retryOptions.getBackoffCoefficient());
39+
setExpirationIntervalInSeconds(
40+
(int) roundUpToSeconds(retryOptions.getExpiration()).getSeconds());
41+
setMaximumAttempts(retryOptions.getMaximumAttempts());
42+
setInitialIntervalInSeconds(
43+
(int) roundUpToSeconds(retryOptions.getInitialInterval()).getSeconds());
44+
setMaximumIntervalInSeconds(
45+
(int) roundUpToSeconds(retryOptions.getMaximumInterval()).getSeconds());
46+
// Use exception type name as the reason
47+
List<String> reasons = new ArrayList<>();
48+
// Use exception type name as the reason
49+
List<Class<? extends Throwable>> doNotRetry = retryOptions.getDoNotRetry();
50+
if (doNotRetry != null) {
51+
for (Class<? extends Throwable> r : doNotRetry) {
52+
reasons.add(r.getName());
53+
}
54+
setNonRetriableErrorReasons(reasons);
55+
}
56+
}
57+
58+
public RetryParameters() {}
59+
3260
public int getInitialIntervalInSeconds() {
3361
return initialIntervalInSeconds;
3462
}
@@ -87,4 +115,32 @@ public RetryParameters copy() {
87115
result.setBackoffCoefficient(backoffCoefficient);
88116
return result;
89117
}
118+
119+
public RetryPolicy toRetryPolicy() {
120+
return new RetryPolicy()
121+
.setNonRetriableErrorReasons(getNonRetriableErrorReasons())
122+
.setMaximumAttempts(getMaximumAttempts())
123+
.setInitialIntervalInSeconds(getInitialIntervalInSeconds())
124+
.setExpirationIntervalInSeconds(getExpirationIntervalInSeconds())
125+
.setBackoffCoefficient(getBackoffCoefficient())
126+
.setMaximumIntervalInSeconds(getMaximumIntervalInSeconds());
127+
}
128+
129+
@Override
130+
public String toString() {
131+
return "RetryParameters{"
132+
+ "initialIntervalInSeconds="
133+
+ initialIntervalInSeconds
134+
+ ", backoffCoefficient="
135+
+ backoffCoefficient
136+
+ ", maximumIntervalInSeconds="
137+
+ maximumIntervalInSeconds
138+
+ ", maximumAttempts="
139+
+ maximumAttempts
140+
+ ", nonRetriableErrorReasons="
141+
+ nonRetriableErrorReasons
142+
+ ", expirationIntervalInSeconds="
143+
+ expirationIntervalInSeconds
144+
+ '}';
145+
}
90146
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public class WorkflowExecutionUtils {
9494
new RetryOptions.Builder()
9595
.setBackoffCoefficient(2)
9696
.setInitialInterval(Duration.ofMillis(500))
97-
.setMaximumInterval(Duration.ofSeconds(10))
97+
.setMaximumInterval(Duration.ofSeconds(30))
98+
.setMaximumAttempts(Integer.MAX_VALUE)
9899
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
99100
.build();
100101

src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
2727
import com.uber.cadence.TaskList;
2828
import com.uber.cadence.TimeoutType;
29+
import com.uber.cadence.internal.common.RetryParameters;
2930
import java.util.HashMap;
3031
import java.util.Map;
3132
import java.util.concurrent.CancellationException;
@@ -81,6 +82,10 @@ public void accept(Exception cause) {
8182
this.decisions = decisions;
8283
}
8384

85+
public boolean isActivityScheduledWithRetryOptions() {
86+
return decisions.isActivityScheduledWithRetryOptions();
87+
}
88+
8489
Consumer<Exception> scheduleActivityTask(
8590
ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
8691
final OpenRequestInfo<byte[], ActivityType> context =
@@ -111,6 +116,11 @@ Consumer<Exception> scheduleActivityTask(
111116
tl.setName(taskList);
112117
attributes.setTaskList(tl);
113118
}
119+
RetryParameters retryParameters = parameters.getRetryParameters();
120+
if (retryParameters != null) {
121+
attributes.setRetryPolicy(retryParameters.toRetryPolicy());
122+
}
123+
114124
long scheduledEventId = decisions.scheduleActivityTask(attributes);
115125
context.setCompletionHandle(callback);
116126
scheduledActivities.put(scheduledEventId, context);

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,19 @@ Consumer<Exception> startChildWorkflow(
9696
BiConsumer<byte[], Exception> callback);
9797

9898
/**
99-
* Is the next event in the history is child workflow initiated event and it has attached retry
99+
* Is the next event in the history is child workflow initiated event and it has an attached retry
100100
* policy. Used for backwards compatibility with the code that used local workflow retry when
101101
* RetryOptions were specified.
102102
*/
103103
boolean isServerSideChildWorkflowRetry();
104104

105+
/**
106+
* Is the next event in the history is an activity scheduled event and it has an attached retry
107+
* policy. Used for the backwards compatibility with the code that used local activity retry when
108+
* RetryOptions were specified.
109+
*/
110+
boolean isServerSideActivityRetry();
111+
105112
Consumer<Exception> signalWorkflowExecution(
106113
SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback);
107114

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ public boolean isServerSideChildWorkflowRetry() {
172172
return workflowClient.isChildWorkflowExecutionStartedWithRetryOptions();
173173
}
174174

175+
@Override
176+
public boolean isServerSideActivityRetry() {
177+
return activityClient.isActivityScheduledWithRetryOptions();
178+
}
179+
175180
@Override
176181
public Consumer<Exception> signalWorkflowExecution(
177182
SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback) {

0 commit comments

Comments
 (0)