Skip to content

Commit bdbe938

Browse files
authored
Updated documentation. (#147)
* Comments and moved MethodRetry to common * Updated javadoc * PR comments
1 parent 5fe28c6 commit bdbe938

22 files changed

+720
-96
lines changed

README.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Add *cadence-client* as a dependency to your *pom.xml*:
2929
<dependency>
3030
<groupId>com.uber</groupId>
3131
<artifactId>cadence-client</artifactId>
32-
<version>0.2.0-SPANPSHOT</version>
32+
<version>0.2.0-SNAPSHOT</version>
3333
</dependency>
3434

3535
# Overview
@@ -246,8 +246,9 @@ It can also answer to synchronous queries and receive external events (also know
246246
A workflow must define an interface class. All of its methods must have one of the following annotations:
247247
- @WorkflowMethod indicates an entry point to a workflow. It contains parameters such as timeouts and a task list. Required
248248
parameters (like executionStartToCloseTimeoutSeconds) that are not specified through the annotation must be provided at runtime.
249-
- @Signal indicates a method that reacts to external signals. It must have a `void` return type.
250-
- @Query indicates a method that reacts to synchronous query requests.
249+
- @SignalMethod indicates a method that reacts to external signals. It must have a `void` return
250+
type.
251+
- @QueryMethod indicates a method that reacts to synchronous query requests.
251252
You can have more than one method with the same annotation.
252253
```java
253254
public interface FileProcessingWorkflow {
@@ -372,17 +373,18 @@ with different options.
372373
### Calling Activities Asynchronously
373374

374375
Sometimes workflows need to perform certain operations in parallel.
375-
The `Workflow.async` static method allows you to invoke any activity asynchronously. The call returns a `Promise` result immediately.
376+
The `Async` class static methods allow you to invoke any activity asynchronously. The calls return a `Promise` result immediately.
376377
`Promise` is similar to both Java `Future` and `CompletionStage`. The `Promise` `get` blocks until a result is available.
377378
It also exposes the `thenApply` and `handle` methods. See the `Promise` JavaDoc for technical details about differences with `Future`.
378379

379380
To convert a synchronous call
380381
```java
381382
String localName = activities.download(surceBucket, sourceFile);
382383
```
383-
to asynchronous style, the method reference is passed to `Workflow.async` followed by activity arguments:
384+
to asynchronous style, the method reference is passed to `Async.function` or `Async.procedure`
385+
followed by activity arguments:
384386
```java
385-
Promise<String> localNamePromise = Workflow.async(activities::download, surceBucket, sourceFile);
387+
Promise<String> localNamePromise = Async.function(activities::download, surceBucket, sourceFile);
386388
```
387389
Then to wait synchronously for the result:
388390
```java
@@ -396,7 +398,7 @@ public void processFile(Arguments args) {
396398
try {
397399
// Download all files in parallel.
398400
for (String sourceFilename : args.getSourceFilenames()) {
399-
Promise<String> localName = Workflow.async(activities::download, args.getSourceBucketName(), sourceFilename);
401+
Promise<String> localName = Async.function(activities::download, args.getSourceBucketName(), sourceFilename);
400402
localNamePromises.add(localName);
401403
}
402404
// allOf converts a list of promises to a single promise that contains a list of each promise value.
@@ -410,7 +412,7 @@ public void processFile(Arguments args) {
410412
// Upload all results in parallel.
411413
List<Promise<Void>> uploadedList = new ArrayList<>();
412414
for (String processedName : processedNames) {
413-
Promise<Void> uploaded = Workflow.async(activities::upload, args.getTargetBucketName(), args.getTargetFilename(), processedName);
415+
Promise<Void> uploaded = Async.procedure(activities::upload, args.getTargetBucketName(), args.getTargetFilename(), processedName);
414416
uploadedList.add(uploaded);
415417
}
416418
// Wait for all uploads to complete.
@@ -439,7 +441,7 @@ Besides activities, a workflow can also orchestrate other workflows.
439441
the timeouts and task list if they differ from the ones defined in the @WorkflowMethod annotation or parent workflow.
440442

441443
The first call to the child workflow stub must always be to a method annotated with @WorkflowMethod. Similarly to activities, a call
442-
can be synchronous or asynchronous using `Workflow.async`. The synchronous call blocks until a child workflow completes. The asynchronous call
444+
can be made synchronous or asynchronous by using `Async#function` or `Async#procedure`. The synchronous call blocks until a child workflow completes. The asynchronous call
443445
returns a `Promise` that can be used to wait for the completion. After an async call returns the stub, it can be used to send signals to the child
444446
by calling methods annotated with `@SignalMethod`. Querying a child workflow by calling methods annotated with @QueryMethod
445447
from within workflow code is not supported. However, queries can be done from activities
@@ -470,11 +472,11 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow {
470472

471473
// Workflows are stateful, so a new stub must be created for each new child.
472474
GreetingChild child1 = Workflow.newChildWorkflowStub(GreetingChild.class);
473-
Promise<String> greeting1 = Workflow.async(child1::composeGreeting, "Hello", name);
475+
Promise<String> greeting1 = Async.function(child1::composeGreeting, "Hello", name);
474476

475477
// Both children will run concurrently.
476478
GreetingChild child2 = Workflow.newChildWorkflowStub(GreetingChild.class);
477-
Promise<String> greeting2 = Workflow.async(child2::composeGreeting, "Bye", name);
479+
Promise<String> greeting2 = Async.function(child2::composeGreeting, "Bye", name);
478480

479481
// Do something else here.
480482
...
@@ -497,7 +499,7 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow {
497499
@Override
498500
public String getGreeting(String name) {
499501
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
500-
Promise<String> greeting = Workflow.async(child::composeGreeting, "Hello", name);
502+
Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
501503
child.updateName("Cadence");
502504
return greeting.get();
503505
}
@@ -518,7 +520,7 @@ This design puts the following constraints on the workflow implementation:
518520
Always do this in activities.
519521
- Don’t perform any IO or service calls as they are not usually deterministic. Use activities for this.
520522
- Only use `Workflow.currentTimeMillis()` to get the current time inside a workflow.
521-
- Do not use native Java `Thread` or any other multi-threaded classes like `ThreadPoolExecutor`. Use `Async.invoke`
523+
- Do not use native Java `Thread` or any other multi-threaded classes like `ThreadPoolExecutor`. Use `Async.function` or `Async.procedure`
522524
to execute code asynchronously.
523525
- Don't use any synchronization, locks, and other standard Java blocking concurrency-related classes besides those provided
524526
by the Workflow class. There is no need in explicit synchronization because multi-threaded code inside a workflow is

src/main/java/com/uber/cadence/activity/Activity.java

Lines changed: 170 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,176 @@
2525
import java.util.concurrent.CancellationException;
2626

2727
/**
28-
* Use this method from within activity implementation to get information about activity task and
29-
* heartbeat.
28+
* An activity is the implementation of a particular task in the business logic.
29+
*
30+
* <h2>Activity Interface</h2>
31+
*
32+
* <p>Activities are defined as methods of a plain Java interface. Each method defines a single
33+
* activity type. A single workflow can use more than one activity interface and call more than one
34+
* activity method from the same interface. The only requirement is that activity method arguments
35+
* and return values are serializable to a byte array using the provided {@link
36+
* com.uber.cadence.converter.DataConverter} implementation. The default implementation uses JSON
37+
* serializer, but an alternative implementation can be easily configured.
38+
*
39+
* <p>Example of an interface that defines four activities:
40+
*
41+
* <pre><code>
42+
* public interface FileProcessingActivities {
43+
*
44+
* void upload(String bucketName, String localName, String targetName);
45+
*
46+
* String download(String bucketName, String remoteName);
47+
*
48+
* {@literal @}ActivityMethod(scheduleToCloseTimeoutSeconds = 2)
49+
* String processFile(String localName);
50+
*
51+
* void deleteLocalFile(String fileName);
52+
* }
53+
*
54+
* </code></pre>
55+
*
56+
* An optional {@literal @}{@link ActivityMethod} annotation can be used to specify activity options
57+
* like timeouts or a task list. Required options that are not specified through the annotation must
58+
* be specified at run time.
59+
*
60+
* <h2>Activity Implementation</h2>
61+
*
62+
* <p>Activity implementation is an implementation of an activity interface. A single instance of
63+
* the activity's implementation is shared across multiple simultaneous activity invocations.
64+
* Therefore, the activity implementation code must be <i>thread safe</i>.
65+
*
66+
* <p>The values passed to activities through invocation parameters or returned through a result
67+
* value are recorded in the execution history. The entire execution history is transferred from the
68+
* Cadence service to workflow workers when a workflow state needs to recover. A large execution
69+
* history can thus adversely impact the performance of your workflow. Therefore, be mindful of the
70+
* amount of data you transfer via activity invocation parameters or return values. Other than that,
71+
* no additional limitations exist on activity implementations.
72+
*
73+
* <pre><code>
74+
* public class FileProcessingActivitiesImpl implements FileProcessingActivities {
75+
*
76+
* private final AmazonS3 s3Client;
77+
*
78+
* private final String localDirectory;
79+
*
80+
* void upload(String bucketName, String localName, String targetName) {
81+
* File f = new File(localName);
82+
* s3Client.putObject(bucket, remoteName, f);
83+
* }
84+
*
85+
* String download(String bucketName, String remoteName, String localName) {
86+
* // Implementation omitted for brevity.
87+
* return downloadFileFromS3(bucketName, remoteName, localDirectory + localName);
88+
* }
89+
*
90+
* String processFile(String localName) {
91+
* // Implementation omitted for brevity.
92+
* return compressFile(localName);
93+
* }
94+
*
95+
* void deleteLocalFile(String fileName) {
96+
* File f = new File(localDirectory + fileName);
97+
* f.delete();
98+
* }
99+
* }
100+
* </code></pre>
101+
*
102+
* <h3>Accessing Activity Info</h3>
103+
*
104+
* <p>The {@link Activity} class provides static getters to access information about the workflow
105+
* that invoked it. Note that this information is stored in a thread-local variable. Therefore,
106+
* calls to Activity accessors succeed only in the thread that invoked the activity function.
107+
*
108+
* <pre><code>
109+
* public class FileProcessingActivitiesImpl implements FileProcessingActivities {
110+
*
111+
* {@literal @}Override
112+
* public String download(String bucketName, String remoteName, String localName) {
113+
* log.info("domain=" + Activity.getDomain());
114+
* WorkflowExecution execution = Activity.getWorkflowExecution();
115+
* log.info("workflowId=" + execution.getWorkflowId());
116+
* log.info("runId=" + execution.getRunId());
117+
* ActivityTask activityTask = Activity.getTask();
118+
* log.info("activityId=" + activityTask.getActivityId());
119+
* log.info("activityTimeout=" + activityTask.getStartToCloseTimeoutSeconds());
120+
* return downloadFileFromS3(bucketName, remoteName, localDirectory + localName);
121+
* }
122+
* ...
123+
* }
124+
* </code></pre>
125+
*
126+
* <h3>Asynchronous Activity Completion</h3>
127+
*
128+
* <p>Sometimes an activity lifecycle goes beyond a synchronous method invocation. For example, a
129+
* request can be put in a queue and later a reply comes and is picked up by a different worker
130+
* process. The whole request-reply interaction can be modeled as a single Cadence activity.
131+
*
132+
* <p>To indicate that an activity should not be completed upon its method return, call {@link
133+
* Activity#doNotCompleteOnReturn()} from the original activity thread. Then later, when replies
134+
* come, complete the activity using {@link com.uber.cadence.client.ActivityCompletionClient}. To
135+
* correlate activity invocation with completion use either {@code TaskToken} or workflow and
136+
* activity IDs.
137+
*
138+
* <pre><code>
139+
* public class FileProcessingActivitiesImpl implements FileProcessingActivities {
140+
*
141+
* public String download(String bucketName, String remoteName, String localName) {
142+
* byte[] taskToken = Activity.getTaskToken(); // Used to correlate reply
143+
* asyncDownloadFileFromS3(taskToken, bucketName, remoteName, localDirectory + localName);
144+
* Activity.doNotCompleteOnReturn();
145+
* return "ignored"; // Return value is ignored when doNotCompleteOnReturn was called.
146+
* }
147+
* ...
148+
* }
149+
* </code></pre>
150+
*
151+
* When the download is complete, the download service potentially calls back from a different
152+
* process:
153+
*
154+
* <pre><code>
155+
* public <R> void completeActivity(byte[] taskToken, R result) {
156+
* completionClient.complete(taskToken, result);
157+
* }
158+
*
159+
* public void failActivity(byte[] taskToken, Exception failure) {
160+
* completionClient.completeExceptionally(taskToken, failure);
161+
* }
162+
* </code></pre>
163+
*
164+
* <h3>Activity Heartbeating</h3>
165+
*
166+
* <p>Some activities are long running. To react to their crashes quickly, use a heartbeat
167+
* mechanism. Use the {@link Activity#heartbeat(Object)} function to let the Cadence service know
168+
* that the activity is still alive. You can piggyback `details` on an activity heartbeat. If an
169+
* activity times out, the last value of `details` is included in the ActivityTimeoutException
170+
* delivered to a workflow. Then the workflow can pass the details to the next activity invocation.
171+
* This acts as a periodic checkpointing mechanism of an activity's progress.
172+
*
173+
* <pre><code>
174+
* public class FileProcessingActivitiesImpl implements FileProcessingActivities {
175+
*
176+
* {@literal @}Override
177+
* public String download(String bucketName, String remoteName, String localName) {
178+
* InputStream inputStream = openInputStream(file);
179+
* try {
180+
* byte[] bytes = new byte[MAX_BUFFER_SIZE];
181+
* while ((read = inputStream.read(bytes)) != -1) {
182+
* totalRead += read;
183+
* f.write(bytes, 0, read);
184+
* // Let the service know about the download progress.
185+
* Activity.heartbeat(totalRead);
186+
* }
187+
* }finally{
188+
* inputStream.close();
189+
* }
190+
* }
191+
* ...
192+
* }
193+
* </code></pre>
194+
*
195+
* @see com.uber.cadence.worker.Worker
196+
* @see com.uber.cadence.workflow.Workflow
197+
* @see com.uber.cadence.client.WorkflowClient
30198
*/
31199
public final class Activity {
32200

src/main/java/com/uber/cadence/activity/ActivityOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
2121

22+
import com.uber.cadence.common.MethodRetry;
2223
import com.uber.cadence.common.RetryOptions;
2324
import java.time.Duration;
2425

src/main/java/com/uber/cadence/activity/ActivityTask.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,43 @@
1717

1818
package com.uber.cadence.activity;
1919

20-
import com.uber.cadence.ActivityType;
2120
import com.uber.cadence.WorkflowExecution;
21+
import java.time.Duration;
2222

23+
/**
24+
* The information about the activity task that the current activity is handling. Use {@link
25+
* Activity#getTask()} to access.
26+
*/
2327
public interface ActivityTask {
24-
byte[] getTaskToken();
2528

26-
byte[] getInput();
29+
/**
30+
* A correlation token that can be used to complete the activity asynchronously through {@link
31+
* com.uber.cadence.client.ActivityCompletionClient#complete(byte[], Object)}.
32+
*/
33+
byte[] getTaskToken();
2734

35+
/** ID and RunID of the workflow that scheduled the activity. */
2836
WorkflowExecution getWorkflowExecution();
2937

38+
/**
39+
* ID of the activity. This ID can be used to complete the activity asynchronously through {@link
40+
* com.uber.cadence.client.ActivityCompletionClient#complete(WorkflowExecution, String, Object)}.
41+
*/
3042
String getActivityId();
3143

32-
ActivityType getActivityType();
44+
/** Type of the activity. */
45+
String getActivityType();
3346

47+
/**
48+
* Time when the activity was initially scheduled by the workflow.
49+
*
50+
* @return timestamp in milliseconds
51+
*/
3452
long getScheduledTimestamp();
3553

36-
int getScheduleToCloseTimeoutSeconds();
37-
38-
void setScheduleToCloseTimeoutSecondsIsSet(boolean value);
54+
Duration getScheduleToCloseTimeout();
3955

40-
int getStartToCloseTimeoutSeconds();
56+
Duration getStartToCloseTimeout();
4157

42-
int getHeartbeatTimeoutSeconds();
58+
Duration getHeartbeatTimeout();
4359
}

src/main/java/com/uber/cadence/client/ActivityCompletionException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class ActivityCompletionException extends RuntimeException {
3131

3232
protected ActivityCompletionException(ActivityTask task) {
3333
execution = task.getWorkflowExecution();
34-
activityType = task.getActivityType().getName();
34+
activityType = task.getActivityType();
3535
activityId = task.getActivityId();
3636
}
3737

@@ -41,14 +41,14 @@ protected ActivityCompletionException(ActivityTask task, Throwable cause) {
4141
? "Execution="
4242
+ task.getWorkflowExecution()
4343
+ ", ActivityType="
44-
+ task.getActivityType().getName()
44+
+ task.getActivityType()
4545
+ ", ActivityID="
4646
+ task.getActivityId()
4747
: null,
4848
cause);
4949
if (task != null) {
5050
execution = task.getWorkflowExecution();
51-
activityType = task.getActivityType().getName();
51+
activityType = task.getActivityType();
5252
activityId = task.getActivityId();
5353
} else {
5454
execution = null;

0 commit comments

Comments
 (0)