Skip to content

Commit b37c7bb

Browse files
authored
Implement local activities (#273)
This PR adds local activities support in Java client.
1 parent cc4f3d0 commit b37c7bb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2208
-349
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ task registerDomain(type:JavaExec) {
189189

190190
test {
191191
dependsOn 'registerDomain'
192+
dependsOn 'checkLicenseMain'
192193
testLogging {
193194
events "passed", "skipped", "failed"
194195
exceptionFormat "full"

src/main/java/com/uber/cadence/activity/ActivityOptions.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.common.MethodRetry;
2323
import com.uber.cadence.common.RetryOptions;
2424
import java.time.Duration;
25+
import java.util.Objects;
2526

2627
/** Options used to configure how an activity is invoked. */
2728
public final class ActivityOptions {
@@ -43,11 +44,12 @@ public static ActivityOptions merge(ActivityMethod a, MethodRetry r, ActivityOpt
4344
}
4445
return new ActivityOptions.Builder()
4546
.setScheduleToCloseTimeout(
46-
merge(a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout()))
47+
mergeDuration(a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout()))
4748
.setScheduleToStartTimeout(
48-
merge(a.scheduleToStartTimeoutSeconds(), o.getScheduleToStartTimeout()))
49-
.setStartToCloseTimeout(merge(a.startToCloseTimeoutSeconds(), o.getStartToCloseTimeout()))
50-
.setHeartbeatTimeout(merge(a.heartbeatTimeoutSeconds(), o.getHeartbeatTimeout()))
49+
mergeDuration(a.scheduleToStartTimeoutSeconds(), o.getScheduleToStartTimeout()))
50+
.setStartToCloseTimeout(
51+
mergeDuration(a.startToCloseTimeoutSeconds(), o.getStartToCloseTimeout()))
52+
.setHeartbeatTimeout(mergeDuration(a.heartbeatTimeoutSeconds(), o.getHeartbeatTimeout()))
5153
.setTaskList(
5254
o.getTaskList() != null
5355
? o.getTaskList()
@@ -278,37 +280,26 @@ public boolean equals(Object o) {
278280
if (o == null || getClass() != o.getClass()) return false;
279281

280282
ActivityOptions that = (ActivityOptions) o;
281-
282-
if (heartbeatTimeout != null
283-
? !heartbeatTimeout.equals(that.heartbeatTimeout)
284-
: that.heartbeatTimeout != null) return false;
285-
if (scheduleToCloseTimeout != null
286-
? !scheduleToCloseTimeout.equals(that.scheduleToCloseTimeout)
287-
: that.scheduleToCloseTimeout != null) return false;
288-
if (scheduleToStartTimeout != null
289-
? !scheduleToStartTimeout.equals(that.scheduleToStartTimeout)
290-
: that.scheduleToStartTimeout != null) return false;
291-
if (startToCloseTimeout != null
292-
? !startToCloseTimeout.equals(that.startToCloseTimeout)
293-
: that.startToCloseTimeout != null) return false;
294-
if (taskList != null ? !taskList.equals(that.taskList) : that.taskList != null) return false;
295-
return retryOptions != null
296-
? retryOptions.equals(that.retryOptions)
297-
: that.retryOptions == null;
283+
return Objects.equals(heartbeatTimeout, that.heartbeatTimeout)
284+
&& Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
285+
&& Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout)
286+
&& Objects.equals(startToCloseTimeout, that.startToCloseTimeout)
287+
&& Objects.equals(taskList, that.taskList)
288+
&& Objects.equals(retryOptions, that.retryOptions);
298289
}
299290

300291
@Override
301292
public int hashCode() {
302-
int result = heartbeatTimeout != null ? heartbeatTimeout.hashCode() : 0;
303-
result = 31 * result + (scheduleToCloseTimeout != null ? scheduleToCloseTimeout.hashCode() : 0);
304-
result = 31 * result + (scheduleToStartTimeout != null ? scheduleToStartTimeout.hashCode() : 0);
305-
result = 31 * result + (startToCloseTimeout != null ? startToCloseTimeout.hashCode() : 0);
306-
result = 31 * result + (taskList != null ? taskList.hashCode() : 0);
307-
result = 31 * result + (retryOptions != null ? retryOptions.hashCode() : 0);
308-
return result;
293+
return Objects.hash(
294+
heartbeatTimeout,
295+
scheduleToCloseTimeout,
296+
scheduleToStartTimeout,
297+
startToCloseTimeout,
298+
taskList,
299+
retryOptions);
309300
}
310301

311-
private static Duration merge(int annotationSeconds, Duration options) {
302+
static Duration mergeDuration(int annotationSeconds, Duration options) {
312303
if (options == null) {
313304
if (annotationSeconds == 0) {
314305
return null;
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.activity;
19+
20+
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
21+
22+
import com.uber.cadence.common.MethodRetry;
23+
import com.uber.cadence.common.RetryOptions;
24+
import java.time.Duration;
25+
import java.util.Objects;
26+
27+
/** Options used to configure how an local activity is invoked. */
28+
public final class LocalActivityOptions {
29+
30+
/**
31+
* Used to merge annotation and options. Options takes precedence. Returns options with all
32+
* defaults filled in.
33+
*/
34+
public static LocalActivityOptions merge(
35+
ActivityMethod a, MethodRetry r, LocalActivityOptions o) {
36+
if (a == null) {
37+
if (r == null) {
38+
return new LocalActivityOptions.Builder(o).validateAndBuildWithDefaults();
39+
}
40+
RetryOptions mergedR = RetryOptions.merge(r, o.getRetryOptions());
41+
return new LocalActivityOptions.Builder()
42+
.setRetryOptions(mergedR)
43+
.validateAndBuildWithDefaults();
44+
}
45+
if (o == null) {
46+
o = new LocalActivityOptions.Builder().build();
47+
}
48+
return new LocalActivityOptions.Builder()
49+
.setScheduleToCloseTimeout(
50+
ActivityOptions.mergeDuration(
51+
a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout()))
52+
.setRetryOptions(RetryOptions.merge(r, o.getRetryOptions()))
53+
.validateAndBuildWithDefaults();
54+
}
55+
56+
public static final class Builder {
57+
private Duration scheduleToCloseTimeout;
58+
private RetryOptions retryOptions;
59+
60+
public Builder() {}
61+
62+
/** Copy Builder fields from the options. */
63+
public Builder(LocalActivityOptions options) {
64+
if (options == null) {
65+
return;
66+
}
67+
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
68+
this.retryOptions = options.retryOptions;
69+
}
70+
71+
/** Overall timeout workflow is willing to wait for activity to complete. */
72+
public Builder setScheduleToCloseTimeout(Duration scheduleToCloseTimeout) {
73+
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
74+
return this;
75+
}
76+
77+
/**
78+
* RetryOptions that define how activity is retried in case of failure. Default is null which is
79+
* no reties.
80+
*/
81+
public Builder setRetryOptions(RetryOptions retryOptions) {
82+
this.retryOptions = retryOptions;
83+
return this;
84+
}
85+
86+
public LocalActivityOptions build() {
87+
return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions);
88+
}
89+
90+
public LocalActivityOptions validateAndBuildWithDefaults() {
91+
RetryOptions ro = null;
92+
if (retryOptions != null) {
93+
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
94+
}
95+
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro);
96+
}
97+
}
98+
99+
private final Duration scheduleToCloseTimeout;
100+
private final RetryOptions retryOptions;
101+
102+
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions) {
103+
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
104+
this.retryOptions = retryOptions;
105+
}
106+
107+
public Duration getScheduleToCloseTimeout() {
108+
return scheduleToCloseTimeout;
109+
}
110+
111+
public RetryOptions getRetryOptions() {
112+
return retryOptions;
113+
}
114+
115+
@Override
116+
public String toString() {
117+
return "LocalActivityOptions{"
118+
+ "scheduleToCloseTimeout="
119+
+ scheduleToCloseTimeout
120+
+ ", retryOptions="
121+
+ retryOptions
122+
+ '}';
123+
}
124+
125+
@Override
126+
public boolean equals(Object o) {
127+
if (this == o) return true;
128+
if (o == null || getClass() != o.getClass()) return false;
129+
130+
LocalActivityOptions that = (LocalActivityOptions) o;
131+
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
132+
&& Objects.equals(retryOptions, that.retryOptions);
133+
}
134+
135+
@Override
136+
public int hashCode() {
137+
return Objects.hash(scheduleToCloseTimeout, retryOptions);
138+
}
139+
}

src/main/java/com/uber/cadence/common/RetryOptions.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.uber.cadence.common;
1919

2020
import com.google.common.base.Defaults;
21+
import com.uber.cadence.workflow.ActivityFailureException;
22+
import com.uber.cadence.workflow.ChildWorkflowFailureException;
2123
import java.time.Duration;
2224
import java.util.Arrays;
2325
import java.util.Collections;
@@ -27,6 +29,7 @@
2729
public final class RetryOptions {
2830

2931
private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
32+
private static final int DEFAULT_MAXIMUM_MULTIPLIER = 100;
3033

3134
/**
3235
* Merges annotation with explicitly provided RetryOptions. If there is conflict RetryOptions
@@ -403,4 +406,32 @@ private Class<? extends Throwable>[] merge(
403406
}
404407
return null;
405408
}
409+
410+
public long calculateSleepTime(long attempt) {
411+
double coefficient =
412+
backoffCoefficient == 0d ? DEFAULT_BACKOFF_COEFFICIENT : backoffCoefficient;
413+
double sleepMillis = Math.pow(coefficient, attempt - 1) * initialInterval.toMillis();
414+
if (maximumInterval == null) {
415+
return (long) Math.min(sleepMillis, initialInterval.toMillis() * DEFAULT_MAXIMUM_MULTIPLIER);
416+
}
417+
return Math.min((long) sleepMillis, maximumInterval.toMillis());
418+
}
419+
420+
public boolean shouldRethrow(Throwable e, long attempt, long elapsed, long sleepTime) {
421+
if (e instanceof ActivityFailureException || e instanceof ChildWorkflowFailureException) {
422+
e = e.getCause();
423+
}
424+
if (doNotRetry != null) {
425+
for (Class<? extends Throwable> doNotRetry : doNotRetry) {
426+
if (doNotRetry.equals(e.getClass())) {
427+
return true;
428+
}
429+
}
430+
}
431+
// Attempt that failed.
432+
if (maximumAttempts != 0 && attempt >= maximumAttempts) {
433+
return true;
434+
}
435+
return expiration != null && elapsed + sleepTime >= expiration.toMillis();
436+
}
406437
}

0 commit comments

Comments
 (0)