Skip to content

Commit 1d7c227

Browse files
authored
Default retry policy for Cadence service API calls (#323)
* Use retry policy for Cadence service API calls * Move default retry option into internal package
1 parent 5aa464a commit 1d7c227

File tree

5 files changed

+88
-46
lines changed

5 files changed

+88
-46
lines changed

src/main/java/com/uber/cadence/internal/common/Retryer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap;
2121

22+
import com.uber.cadence.*;
2223
import com.uber.cadence.common.RetryOptions;
2324
import java.time.Duration;
2425
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,34 @@
3031
import org.slf4j.LoggerFactory;
3132

3233
public final class Retryer {
34+
public static final RetryOptions DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
35+
36+
private static final Duration RETRY_SERVICE_OPERATION_INITIAL_INTERVAL = Duration.ofMillis(20);
37+
private static final Duration RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL = Duration.ofMinutes(1);
38+
private static final double RETRY_SERVICE_OPERATION_BACKOFF = 1.2;
39+
40+
static {
41+
RetryOptions.Builder roBuilder =
42+
new RetryOptions.Builder()
43+
.setInitialInterval(RETRY_SERVICE_OPERATION_INITIAL_INTERVAL)
44+
.setExpiration(RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL)
45+
.setBackoffCoefficient(RETRY_SERVICE_OPERATION_BACKOFF);
46+
47+
Duration maxInterval = RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL.dividedBy(10);
48+
if (maxInterval.compareTo(RETRY_SERVICE_OPERATION_INITIAL_INTERVAL) < 0) {
49+
maxInterval = RETRY_SERVICE_OPERATION_INITIAL_INTERVAL;
50+
}
51+
roBuilder.setMaximumInterval(maxInterval);
52+
roBuilder.setDoNotRetry(
53+
BadRequestError.class,
54+
EntityNotExistsError.class,
55+
WorkflowExecutionAlreadyStartedError.class,
56+
DomainAlreadyExistsError.class,
57+
QueryFailedError.class,
58+
DomainNotActiveError.class,
59+
CancellationAlreadyRequestedError.class);
60+
DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults();
61+
}
3362

3463
public interface RetryableProc<E extends Throwable> {
3564

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,21 @@
1818
package com.uber.cadence.internal.external;
1919

2020
import com.google.common.base.Strings;
21-
import com.uber.cadence.*;
22-
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
23-
import com.uber.cadence.internal.common.RetryParameters;
24-
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
25-
import com.uber.cadence.internal.common.StartWorkflowExecutionParameters;
26-
import com.uber.cadence.internal.common.TerminateWorkflowExecutionParameters;
21+
import com.uber.cadence.Memo;
22+
import com.uber.cadence.QueryWorkflowRequest;
23+
import com.uber.cadence.QueryWorkflowResponse;
24+
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
25+
import com.uber.cadence.RetryPolicy;
26+
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
27+
import com.uber.cadence.SignalWorkflowExecutionRequest;
28+
import com.uber.cadence.StartWorkflowExecutionRequest;
29+
import com.uber.cadence.StartWorkflowExecutionResponse;
30+
import com.uber.cadence.TaskList;
31+
import com.uber.cadence.TerminateWorkflowExecutionRequest;
32+
import com.uber.cadence.WorkflowExecution;
33+
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
34+
import com.uber.cadence.WorkflowQuery;
35+
import com.uber.cadence.internal.common.*;
2736
import com.uber.cadence.internal.metrics.MetricsTag;
2837
import com.uber.cadence.internal.metrics.MetricsType;
2938
import com.uber.cadence.internal.replay.QueryWorkflowParameters;
@@ -40,9 +49,7 @@
4049
public final class GenericWorkflowClientExternalImpl implements GenericWorkflowClientExternal {
4150

4251
private final String domain;
43-
4452
private final IWorkflowService service;
45-
4653
private final Scope metricsScope;
4754

4855
public GenericWorkflowClientExternalImpl(
@@ -117,7 +124,10 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
117124

118125
StartWorkflowExecutionResponse result;
119126
try {
120-
result = service.StartWorkflowExecution(request);
127+
result =
128+
Retryer.retryWithResult(
129+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
130+
() -> service.StartWorkflowExecution(request));
121131
} catch (WorkflowExecutionAlreadyStartedError e) {
122132
throw e;
123133
} catch (TException e) {
@@ -166,7 +176,9 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
166176
execution.setWorkflowId(signalParameters.getWorkflowId());
167177
request.setWorkflowExecution(execution);
168178
try {
169-
service.SignalWorkflowExecution(request);
179+
Retryer.retry(
180+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
181+
() -> service.SignalWorkflowExecution(request));
170182
} catch (TException e) {
171183
throw CheckedExceptionWrapper.wrap(e);
172184
}
@@ -212,7 +224,10 @@ public WorkflowExecution signalWithStartWorkflowExecution(
212224
}
213225
StartWorkflowExecutionResponse result;
214226
try {
215-
result = service.SignalWithStartWorkflowExecution(request);
227+
result =
228+
Retryer.retryWithResult(
229+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
230+
() -> service.SignalWithStartWorkflowExecution(request));
216231
} catch (TException e) {
217232
throw CheckedExceptionWrapper.wrap(e);
218233
}
@@ -228,7 +243,9 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) {
228243
request.setDomain(domain);
229244
request.setWorkflowExecution(execution);
230245
try {
231-
service.RequestCancelWorkflowExecution(request);
246+
Retryer.retry(
247+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
248+
() -> service.RequestCancelWorkflowExecution(request));
232249
} catch (TException e) {
233250
throw CheckedExceptionWrapper.wrap(e);
234251
}
@@ -246,7 +263,10 @@ public byte[] queryWorkflow(QueryWorkflowParameters queryParameters) {
246263
query.setQueryType(queryParameters.getQueryType());
247264
request.setQuery(query);
248265
try {
249-
QueryWorkflowResponse response = service.QueryWorkflow(request);
266+
QueryWorkflowResponse response =
267+
Retryer.retryWithResult(
268+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
269+
() -> service.QueryWorkflow(request));
250270
return response.getQueryResult();
251271
} catch (TException e) {
252272
throw CheckedExceptionWrapper.wrap(e);
@@ -268,7 +288,9 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term
268288
request.setReason(terminateParameters.getReason());
269289
// request.setChildPolicy(terminateParameters.getChildPolicy());
270290
try {
271-
service.TerminateWorkflowExecution(request);
291+
Retryer.retry(
292+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
293+
() -> service.TerminateWorkflowExecution(request));
272294
} catch (TException e) {
273295
throw CheckedExceptionWrapper.wrap(e);
274296
}

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.uber.cadence.client.ActivityCompletionFailureException;
3232
import com.uber.cadence.client.ActivityNotExistsException;
3333
import com.uber.cadence.converter.DataConverter;
34+
import com.uber.cadence.internal.common.Retryer;
3435
import com.uber.cadence.internal.metrics.MetricsType;
3536
import com.uber.cadence.serviceclient.IWorkflowService;
3637
import com.uber.m3.tally.Scope;
@@ -55,7 +56,7 @@ class ManualActivityCompletionClientImpl extends ManualActivityCompletionClient
5556
private final String activityId;
5657
private final Scope metricsScope;
5758

58-
public ManualActivityCompletionClientImpl(
59+
ManualActivityCompletionClientImpl(
5960
IWorkflowService service, byte[] taskToken, DataConverter dataConverter, Scope metricsScope) {
6061
this.service = service;
6162
this.taskToken = taskToken;
@@ -66,7 +67,7 @@ public ManualActivityCompletionClientImpl(
6667
this.metricsScope = metricsScope;
6768
}
6869

69-
public ManualActivityCompletionClientImpl(
70+
ManualActivityCompletionClientImpl(
7071
IWorkflowService service,
7172
String domain,
7273
WorkflowExecution execution,
@@ -90,7 +91,9 @@ public void complete(Object result) {
9091
request.setResult(convertedResult);
9192
request.setTaskToken(taskToken);
9293
try {
93-
service.RespondActivityTaskCompleted(request);
94+
Retryer.retry(
95+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
96+
() -> service.RespondActivityTaskCompleted(request));
9497
metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
9598
} catch (EntityNotExistsError e) {
9699
throw new ActivityNotExistsException(e);
@@ -132,7 +135,9 @@ public void fail(Throwable failure) {
132135
request.setDetails(dataConverter.toData(failure));
133136
request.setTaskToken(taskToken);
134137
try {
135-
service.RespondActivityTaskFailed(request);
138+
Retryer.retry(
139+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
140+
() -> service.RespondActivityTaskFailed(request));
136141
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1);
137142
} catch (EntityNotExistsError e) {
138143
throw new ActivityNotExistsException(e);
@@ -147,7 +152,9 @@ public void fail(Throwable failure) {
147152
request.setWorkflowID(execution.getWorkflowId());
148153
request.setRunID(execution.getRunId());
149154
try {
150-
service.RespondActivityTaskFailedByID(request);
155+
Retryer.retry(
156+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
157+
() -> service.RespondActivityTaskFailedByID(request));
151158
metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1);
152159
} catch (EntityNotExistsError e) {
153160
throw new ActivityNotExistsException(e);

src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.uber.cadence.common.RetryOptions;
2121
import com.uber.cadence.converter.DataConverter;
2222
import com.uber.cadence.converter.JsonDataConverter;
23+
import com.uber.cadence.internal.common.Retryer;
2324
import com.uber.cadence.internal.metrics.NoopScope;
2425
import com.uber.m3.tally.Scope;
2526
import java.time.Duration;
@@ -90,21 +91,11 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
9091

9192
public SingleWorkerOptions build() {
9293
if (reportCompletionRetryOptions == null) {
93-
reportCompletionRetryOptions =
94-
new RetryOptions.Builder()
95-
.setInitialInterval(Duration.ofMillis(50))
96-
.setMaximumInterval(Duration.ofSeconds(10))
97-
.setExpiration(Duration.ofMinutes(1))
98-
.build();
94+
reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
9995
}
10096

10197
if (reportFailureRetryOptions == null) {
102-
reportFailureRetryOptions =
103-
new RetryOptions.Builder()
104-
.setInitialInterval(Duration.ofMillis(50))
105-
.setMaximumInterval(Duration.ofSeconds(10))
106-
.setExpiration(Duration.ofMinutes(1))
107-
.build();
98+
reportFailureRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
10899
}
109100

110101
if (pollerOptions == null) {

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105

106106
public class WorkflowServiceTChannel implements IWorkflowService {
107107

108-
public static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933;
108+
private static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933;
109109

110110
private static final String LOCALHOST = "127.0.0.1";
111111

@@ -117,10 +117,10 @@ public class WorkflowServiceTChannel implements IWorkflowService {
117117
/** Default RPC timeout for QueryWorkflow */
118118
private static final long DEFAULT_QUERY_RPC_TIMEOUT_MILLIS = 10000;
119119

120-
public static final String DEFAULT_CLIENT_APP_NAME = "cadence-client";
120+
private static final String DEFAULT_CLIENT_APP_NAME = "cadence-client";
121121

122122
/** Name of the Cadence service front end as required by TChannel. */
123-
public static final String DEFAULT_SERVICE_NAME = "cadence-frontend";
123+
private static final String DEFAULT_SERVICE_NAME = "cadence-frontend";
124124

125125
private static final Logger log = LoggerFactory.getLogger(WorkflowServiceTChannel.class);
126126

@@ -811,8 +811,7 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
811811
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request =
812812
buildThriftRequest(
813813
"GetWorkflowExecutionHistory",
814-
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest),
815-
options.getRpcLongPollTimeoutMillis());
814+
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest));
816815
response = doRemoteCall(request);
817816
WorkflowService.GetWorkflowExecutionHistory_result result =
818817
response.getBody(WorkflowService.GetWorkflowExecutionHistory_result.class);
@@ -905,8 +904,7 @@ private RespondDecisionTaskCompletedResponse respondDecisionTaskCompleted(
905904
ThriftRequest<WorkflowService.RespondDecisionTaskCompleted_args> request =
906905
buildThriftRequest(
907906
"RespondDecisionTaskCompleted",
908-
new WorkflowService.RespondDecisionTaskCompleted_args(completedRequest),
909-
options.getRpcLongPollTimeoutMillis());
907+
new WorkflowService.RespondDecisionTaskCompleted_args(completedRequest));
910908
response = doRemoteCall(request);
911909
WorkflowService.RespondDecisionTaskCompleted_result result =
912910
response.getBody(WorkflowService.RespondDecisionTaskCompleted_result.class);
@@ -1519,10 +1517,7 @@ private void signalWorkflowExecution(SignalWorkflowExecutionRequest signalReques
15191517

15201518
@Override
15211519
public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(
1522-
SignalWithStartWorkflowExecutionRequest signalWithStartRequest)
1523-
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
1524-
DomainNotActiveError, LimitExceededError, WorkflowExecutionAlreadyStartedError,
1525-
TException {
1520+
SignalWithStartWorkflowExecutionRequest signalWithStartRequest) throws TException {
15261521
return measureRemoteCall(
15271522
ServiceMethod.SIGNAL_WITH_START_WORKFLOW_EXECUTION,
15281523
() -> signalWithStartWorkflowExecution(signalWithStartRequest));
@@ -1536,8 +1531,7 @@ private StartWorkflowExecutionResponse signalWithStartWorkflowExecution(
15361531
ThriftRequest<WorkflowService.SignalWithStartWorkflowExecution_args> request =
15371532
buildThriftRequest(
15381533
"SignalWithStartWorkflowExecution",
1539-
new WorkflowService.SignalWithStartWorkflowExecution_args(signalWithStartRequest),
1540-
options.getRpcLongPollTimeoutMillis());
1534+
new WorkflowService.SignalWithStartWorkflowExecution_args(signalWithStartRequest));
15411535
response = doRemoteCall(request);
15421536
WorkflowService.SignalWithStartWorkflowExecution_result result =
15431537
response.getBody(WorkflowService.SignalWithStartWorkflowExecution_result.class);
@@ -1943,8 +1937,7 @@ public void GetWorkflowExecutionHistory(
19431937
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request =
19441938
buildThriftRequest(
19451939
"GetWorkflowExecutionHistory",
1946-
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest),
1947-
options.getRpcLongPollTimeoutMillis());
1940+
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest));
19481941
response = doRemoteCallAsync(request);
19491942

19501943
response

0 commit comments

Comments
 (0)