Skip to content

Commit fd61adb

Browse files
author
Liang Mei
committed
Cleanup retryer and retry options
1 parent f6cca51 commit fd61adb

15 files changed

+67
-220
lines changed

src/main/java/com/uber/cadence/internal/common/Retryer.java renamed to src/main/java/com/uber/cadence/internal/common/RpcRetryer.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919

2020
import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap;
2121

22-
import com.uber.cadence.*;
22+
import com.uber.cadence.BadRequestError;
23+
import com.uber.cadence.CancellationAlreadyRequestedError;
24+
import com.uber.cadence.DomainAlreadyExistsError;
25+
import com.uber.cadence.DomainNotActiveError;
26+
import com.uber.cadence.EntityNotExistsError;
27+
import com.uber.cadence.QueryFailedError;
28+
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
2329
import com.uber.cadence.common.RetryOptions;
2430
import java.time.Duration;
2531
import java.util.concurrent.CancellationException;
@@ -31,8 +37,8 @@
3137
import org.slf4j.Logger;
3238
import org.slf4j.LoggerFactory;
3339

34-
public final class Retryer {
35-
public static final RetryOptions DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
40+
public final class RpcRetryer {
41+
public static final RetryOptions DEFAULT_RPC_RETRY_OPTIONS;
3642

3743
private static final Duration RETRY_SERVICE_OPERATION_INITIAL_INTERVAL = Duration.ofMillis(20);
3844
private static final Duration RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL = Duration.ofMinutes(1);
@@ -57,8 +63,9 @@ public final class Retryer {
5763
DomainAlreadyExistsError.class,
5864
QueryFailedError.class,
5965
DomainNotActiveError.class,
60-
CancellationAlreadyRequestedError.class);
61-
DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults();
66+
CancellationAlreadyRequestedError.class,
67+
DomainNotActiveError.class);
68+
DEFAULT_RPC_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults();
6269
}
6370

6471
public interface RetryableProc<E extends Throwable> {
@@ -81,7 +88,7 @@ private static class ValueExceptionPair<V> {
8188
private final CompletableFuture<V> value;
8289
private final Throwable exception;
8390

84-
public ValueExceptionPair(CompletableFuture<V> value, Throwable exception) {
91+
ValueExceptionPair(CompletableFuture<V> value, Throwable exception) {
8592
this.value = value;
8693
this.exception = exception;
8794
}
@@ -95,7 +102,7 @@ public Throwable getException() {
95102
}
96103
}
97104

98-
private static final Logger log = LoggerFactory.getLogger(Retryer.class);
105+
private static final Logger log = LoggerFactory.getLogger(RpcRetryer.class);
99106

100107
public static <T extends Throwable> void retry(RetryOptions options, RetryableProc<T> r)
101108
throws T {
@@ -107,6 +114,10 @@ public static <T extends Throwable> void retry(RetryOptions options, RetryablePr
107114
});
108115
}
109116

117+
public static <T extends Throwable> void retryWithDefaultOption(RetryableProc<T> r) throws T {
118+
retry(DEFAULT_RPC_RETRY_OPTIONS, r);
119+
}
120+
110121
public static <R, T extends Throwable> R retryWithResult(
111122
RetryOptions options, RetryableFunc<R, T> r) throws T {
112123
int attempt = 0;
@@ -266,5 +277,5 @@ private static <T extends Throwable> void rethrow(Exception e) throws T {
266277
}
267278

268279
/** Prohibits instantiation. */
269-
private Retryer() {}
280+
private RpcRetryer() {}
270281
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.gson.JsonParser;
2828
import com.google.gson.JsonPrimitive;
2929
import com.uber.cadence.ActivityType;
30-
import com.uber.cadence.BadRequestError;
3130
import com.uber.cadence.Decision;
3231
import com.uber.cadence.DecisionType;
3332
import com.uber.cadence.DescribeWorkflowExecutionRequest;
@@ -91,15 +90,6 @@ public class WorkflowExecutionUtils {
9190
*/
9291
private static final String INDENTATION = " ";
9392

94-
private static RetryOptions retryParameters =
95-
new RetryOptions.Builder()
96-
.setBackoffCoefficient(2)
97-
.setInitialInterval(Duration.ofMillis(500))
98-
.setMaximumInterval(Duration.ofSeconds(30))
99-
.setMaximumAttempts(Integer.MAX_VALUE)
100-
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
101-
.build();
102-
10393
// Wait period for passive cluster to retry getting workflow result in case of replication delay.
10494
private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500;
10595

@@ -216,7 +206,7 @@ private static HistoryEvent getInstanceCloseEvent(
216206
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
217207
try {
218208
response =
219-
Retryer.retryWithResult(
209+
RpcRetryer.retryWithResult(
220210
retryOptions,
221211
() -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
222212
} catch (EntityNotExistsError e) {
@@ -343,7 +333,7 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
343333
}
344334

345335
private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) {
346-
return new RetryOptions.Builder(retryParameters)
336+
return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
347337
.setExpiration(Duration.ofSeconds(unit.toSeconds(timeout)))
348338
.build();
349339
}
@@ -355,7 +345,7 @@ private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit uni
355345
long timeout,
356346
TimeUnit unit) {
357347
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
358-
return Retryer.retryWithResultAsync(
348+
return RpcRetryer.retryWithResultAsync(
359349
retryOptions,
360350
() -> {
361351
CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,8 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
118118
StartWorkflowExecutionResponse result;
119119
try {
120120
result =
121-
Retryer.retryWithResult(
122-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
123-
() -> service.StartWorkflowExecution(request));
121+
RpcRetryer.retryWithResult(
122+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.StartWorkflowExecution(request));
124123
} catch (WorkflowExecutionAlreadyStartedError e) {
125124
throw e;
126125
} catch (TException e) {
@@ -137,7 +136,7 @@ private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutI
137136
if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) {
138137
return o;
139138
}
140-
return new RetryOptions.Builder(Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS)
139+
return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
141140
.setExpiration(Duration.ofMillis((timeoutInMillis)))
142141
.build();
143142
}
@@ -146,9 +145,8 @@ private CompletableFuture<WorkflowExecution> startWorkflowAsyncInternal(
146145
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
147146
StartWorkflowExecutionRequest request = getStartRequest(startParameters);
148147

149-
return Retryer.retryWithResultAsync(
150-
getRetryOptionsWithExpiration(
151-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
148+
return RpcRetryer.retryWithResultAsync(
149+
getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
152150
() -> {
153151
CompletableFuture<WorkflowExecution> result = new CompletableFuture<>();
154152
try {
@@ -272,9 +270,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
272270
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
273271

274272
try {
275-
Retryer.retry(
276-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
277-
() -> service.SignalWorkflowExecution(request));
273+
RpcRetryer.retryWithDefaultOption(() -> service.SignalWorkflowExecution(request));
278274
} catch (TException e) {
279275
throw CheckedExceptionWrapper.wrap(e);
280276
}
@@ -290,9 +286,8 @@ public CompletableFuture<Void> signalWorkflowExecutionAsync(
290286
public CompletableFuture<Void> signalWorkflowExecutionAsync(
291287
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
292288
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
293-
return Retryer.retryWithResultAsync(
294-
getRetryOptionsWithExpiration(
295-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
289+
return RpcRetryer.retryWithResultAsync(
290+
getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
296291
() -> {
297292
CompletableFuture<Void> result = new CompletableFuture<>();
298293
try {
@@ -387,8 +382,8 @@ private WorkflowExecution signalWithStartWorkflowInternal(
387382
StartWorkflowExecutionResponse result;
388383
try {
389384
result =
390-
Retryer.retryWithResult(
391-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
385+
RpcRetryer.retryWithResult(
386+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
392387
() -> service.SignalWithStartWorkflowExecution(request));
393388
} catch (TException e) {
394389
throw CheckedExceptionWrapper.wrap(e);
@@ -405,9 +400,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) {
405400
request.setDomain(domain);
406401
request.setWorkflowExecution(execution);
407402
try {
408-
Retryer.retry(
409-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
410-
() -> service.RequestCancelWorkflowExecution(request));
403+
RpcRetryer.retryWithDefaultOption(() -> service.RequestCancelWorkflowExecution(request));
411404
} catch (TException e) {
412405
throw CheckedExceptionWrapper.wrap(e);
413406
}
@@ -427,9 +420,8 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete
427420
request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
428421
try {
429422
QueryWorkflowResponse response =
430-
Retryer.retryWithResult(
431-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
432-
() -> service.QueryWorkflow(request));
423+
RpcRetryer.retryWithResult(
424+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.QueryWorkflow(request));
433425
return response;
434426
} catch (TException e) {
435427
throw CheckedExceptionWrapper.wrap(e);
@@ -451,9 +443,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term
451443
request.setReason(terminateParameters.getReason());
452444
// request.setChildPolicy(terminateParameters.getChildPolicy());
453445
try {
454-
Retryer.retry(
455-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
456-
() -> service.TerminateWorkflowExecution(request));
446+
RpcRetryer.retryWithDefaultOption(() -> service.TerminateWorkflowExecution(request));
457447
} catch (TException e) {
458448
throw CheckedExceptionWrapper.wrap(e);
459449
}

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.uber.cadence.client.ActivityCompletionFailureException;
3232
import com.uber.cadence.client.ActivityNotExistsException;
3333
import com.uber.cadence.converter.DataConverter;
34-
import com.uber.cadence.internal.common.Retryer;
34+
import com.uber.cadence.internal.common.RpcRetryer;
3535
import com.uber.cadence.internal.metrics.MetricsType;
3636
import com.uber.cadence.serviceclient.IWorkflowService;
3737
import com.uber.m3.tally.Scope;
@@ -95,9 +95,7 @@ public void complete(Object result) {
9595
request.setResult(convertedResult);
9696
request.setTaskToken(taskToken);
9797
try {
98-
Retryer.retry(
99-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
100-
() -> service.RespondActivityTaskCompleted(request));
98+
RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskCompleted(request));
10199
metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
102100
} catch (EntityNotExistsError e) {
103101
throw new ActivityNotExistsException(e);
@@ -139,9 +137,7 @@ public void fail(Throwable failure) {
139137
request.setDetails(dataConverter.toData(failure));
140138
request.setTaskToken(taskToken);
141139
try {
142-
Retryer.retry(
143-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
144-
() -> service.RespondActivityTaskFailed(request));
140+
RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailed(request));
145141
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1);
146142
} catch (EntityNotExistsError e) {
147143
throw new ActivityNotExistsException(e);
@@ -156,9 +152,7 @@ public void fail(Throwable failure) {
156152
request.setWorkflowID(execution.getWorkflowId());
157153
request.setRunID(execution.getRunId());
158154
try {
159-
Retryer.retry(
160-
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
161-
() -> service.RespondActivityTaskFailedByID(request));
155+
RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailedByID(request));
162156
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1);
163157
} catch (EntityNotExistsError e) {
164158
throw new ActivityNotExistsException(e);

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import com.uber.cadence.WorkflowType;
3333
import com.uber.cadence.common.RetryOptions;
3434
import com.uber.cadence.internal.common.OptionsUtils;
35-
import com.uber.cadence.internal.common.Retryer;
35+
import com.uber.cadence.internal.common.RpcRetryer;
3636
import com.uber.cadence.internal.metrics.MetricsTag;
3737
import com.uber.cadence.internal.metrics.MetricsType;
3838
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
@@ -667,7 +667,7 @@ public HistoryEvent next() {
667667

668668
try {
669669
GetWorkflowExecutionHistoryResponse r =
670-
Retryer.retryWithResult(
670+
RpcRetryer.retryWithResult(
671671
retryOptions, () -> service.GetWorkflowExecutionHistory(request));
672672
current = r.getHistory().getEventsIterator();
673673
nextPageToken = r.getNextPageToken();

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse
113113
e.printStackTrace(pw);
114114
String stackTrace = sw.toString();
115115
failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8));
116-
return new DecisionTaskHandler.Result(null, failedRequest, null, null);
116+
return new DecisionTaskHandler.Result(null, failedRequest, null);
117117
}
118118
}
119119

@@ -236,7 +236,7 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
236236
cache.markProcessingDone(decisionTask);
237237
}
238238
}
239-
return new Result(null, null, queryCompletedRequest, null);
239+
return new Result(null, null, queryCompletedRequest);
240240
}
241241

242242
private Result createCompletedRequest(
@@ -254,7 +254,7 @@ private Result createCompletedRequest(
254254
(int) stickyTaskListScheduleToStartTimeout.getSeconds());
255255
completedRequest.setStickyAttributes(attributes);
256256
}
257-
return new Result(completedRequest, null, null, null);
257+
return new Result(completedRequest, null, null);
258258
}
259259

260260
@Override

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ private ActivityTaskHandler.Result mapToActivityFailure(
147147
failure = CheckedExceptionWrapper.unwrap(failure);
148148
result.setReason(failure.getClass().getName());
149149
result.setDetails(dataConverter.toData(failure));
150-
return new ActivityTaskHandler.Result(
151-
null, new Result.TaskFailedResult(result, failure), null, null);
150+
return new ActivityTaskHandler.Result(null, new Result.TaskFailedResult(result, failure), null);
152151
}
153152

154153
@Override
@@ -214,12 +213,12 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
214213
Object result = method.invoke(activity, args);
215214
RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest();
216215
if (context.isDoNotCompleteOnReturn()) {
217-
return new ActivityTaskHandler.Result(null, null, null, null);
216+
return new ActivityTaskHandler.Result(null, null, null);
218217
}
219218
if (method.getReturnType() != Void.TYPE) {
220219
request.setResult(dataConverter.toData(result));
221220
}
222-
return new ActivityTaskHandler.Result(request, null, null, null);
221+
return new ActivityTaskHandler.Result(request, null, null);
223222
} catch (RuntimeException | IllegalAccessException e) {
224223
return mapToActivityFailure(e, metricsScope, false);
225224
} catch (InvocationTargetException e) {
@@ -252,7 +251,7 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
252251
if (method.getReturnType() != Void.TYPE) {
253252
request.setResult(dataConverter.toData(result));
254253
}
255-
return new ActivityTaskHandler.Result(request, null, null, null);
254+
return new ActivityTaskHandler.Result(request, null, null);
256255
} catch (RuntimeException | IllegalAccessException e) {
257256
return mapToActivityFailure(e, metricsScope, true);
258257
} catch (InvocationTargetException e) {

src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.uber.cadence.RespondActivityTaskCanceledRequest;
2222
import com.uber.cadence.RespondActivityTaskCompletedRequest;
2323
import com.uber.cadence.RespondActivityTaskFailedRequest;
24-
import com.uber.cadence.common.RetryOptions;
2524
import com.uber.m3.tally.Scope;
2625
import java.time.Duration;
2726

@@ -37,7 +36,6 @@ final class Result {
3736
private final RespondActivityTaskCompletedRequest taskCompleted;
3837
private final TaskFailedResult taskFailed;
3938
private final RespondActivityTaskCanceledRequest taskCancelled;
40-
private final RetryOptions requestRetryOptions;
4139
private int attempt;
4240
private Duration backoff;
4341

@@ -68,12 +66,10 @@ public Throwable getFailure() {
6866
public Result(
6967
RespondActivityTaskCompletedRequest taskCompleted,
7068
TaskFailedResult taskFailed,
71-
RespondActivityTaskCanceledRequest taskCancelled,
72-
RetryOptions requestRetryOptions) {
69+
RespondActivityTaskCanceledRequest taskCancelled) {
7370
this.taskCompleted = taskCompleted;
7471
this.taskFailed = taskFailed;
7572
this.taskCancelled = taskCancelled;
76-
this.requestRetryOptions = requestRetryOptions;
7773
}
7874

7975
public RespondActivityTaskCompletedRequest getTaskCompleted() {
@@ -88,10 +84,6 @@ public RespondActivityTaskCanceledRequest getTaskCancelled() {
8884
return taskCancelled;
8985
}
9086

91-
public RetryOptions getRequestRetryOptions() {
92-
return requestRetryOptions;
93-
}
94-
9587
public void setAttempt(int attempt) {
9688
this.attempt = attempt;
9789
}

0 commit comments

Comments
 (0)