Skip to content

Commit ac68a92

Browse files
authored
Rewrote fileprocessing to not use S3 (#5)
1 parent 0341953 commit ac68a92

15 files changed

+256
-574
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ The steps for configuring and building other samples for Java Cadence Client are
7878
set AWS_SWF_SAMPLES_CONFIG=<Your SDK Directory>
7979

8080
## Prerequisite
81-
Run Cadence Server
81+
Run Cadence Server using Docker Compose
8282

8383
curl -O https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml
8484
docker-compose up

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingActivities.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingActivitiesZipImpl.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingConfigKeys.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingStarter.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,52 +16,34 @@
1616
*/
1717
package com.uber.cadence.samples.fileprocessing;
1818

19-
import com.uber.cadence.WorkflowService;
2019
import com.uber.cadence.client.WorkflowClient;
21-
import com.uber.cadence.samples.common.ConfigHelper;
20+
21+
import java.net.URL;
22+
23+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
2224

2325
/**
24-
* This is used for launching a Workflow instance of FileProcessingWorkflowExample
26+
* This is used for launching a Workflow instance of file processing sample.
2527
*/
2628
public class FileProcessingStarter {
2729

28-
static final String WORKFLOW_TASK_LIST = "FileProcessing";
29-
30-
private static WorkflowService.Iface swfService;
31-
private static String domain;
32-
3330
public static void main(String[] args) throws Exception {
34-
35-
// Load configuration
36-
ConfigHelper configHelper = ConfigHelper.createConfig();
37-
38-
// Create the client for Simple Workflow Service
39-
swfService = configHelper.createWorkflowClient();
40-
domain = configHelper.getDomain();
41-
4231
// Start Workflow instance
43-
String sourceBucketName = configHelper.getValueFromConfig(FileProcessingConfigKeys.WORKFLOW_INPUT_SOURCEBUCKETNAME_KEY);
44-
String sourceFilename = configHelper.getValueFromConfig(FileProcessingConfigKeys.WORKFLOW_INPUT_SOURCEFILENAME_KEY);
45-
String targetBucketName = configHelper.getValueFromConfig(FileProcessingConfigKeys.WORKFLOW_INPUT_TARGETBUCKETNAME_KEY);
46-
String targetFilename = configHelper.getValueFromConfig(FileProcessingConfigKeys.WORKFLOW_INPUT_TARGETFILENAME_KEY);
32+
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
33+
FileProcessingWorkflow workflow = workflowClient.newWorkflowStub(FileProcessingWorkflow.class);
4734

48-
FileProcessingWorkflow.Arguments workflowArgs = new FileProcessingWorkflow.Arguments();
49-
workflowArgs.setSourceBucketName(sourceBucketName);
50-
workflowArgs.setSourceFilename(sourceFilename);
51-
workflowArgs.setTargetBucketName(targetBucketName);
52-
workflowArgs.setTargetFilename(targetFilename);
35+
System.out.println("Executing FileProcessingWorkflow");
5336

54-
WorkflowClient workflowClient = WorkflowClient.newInstance(swfService, domain);
55-
FileProcessingWorkflow workflow = workflowClient.newWorkflowStub(FileProcessingWorkflow.class);
37+
URL source = new URL("http://www.google.com/");
38+
URL destination = new URL("http://dummy");
5639

5740
// This is going to block until the workflow completion.
5841
// This is rarely used in production. Use the commented code below for async start version.
59-
System.out.println("Executing FileProcessingWorkflow");
60-
workflow.processFile(workflowArgs);
42+
workflow.processFile(source, destination);
43+
System.out.println("FileProcessingWorkflow completed");
6144

6245
// Use this code instead of the above blocking call to start workflow asynchronously.
63-
// WorkflowExecution workflowExecution = WorkflowClient.asyncStart(workflow::processFile, workflowArgs);
64-
//
46+
// WorkflowExecution workflowExecution = WorkflowClient.asyncStart(workflow::processFile, source, destination);
6547
// System.out.println("Started periodic workflow with workflowId=\"" + workflowExecution.getWorkflowId()
6648
// + "\" and runId=\"" + workflowExecution.getRunId() + "\"");
6749
//

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingWorker.java

Lines changed: 16 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,76 +16,38 @@
1616
*/
1717
package com.uber.cadence.samples.fileprocessing;
1818

19-
import com.amazonaws.services.s3.AmazonS3;
20-
import com.uber.cadence.WorkflowService;
21-
import com.uber.cadence.samples.common.ConfigHelper;
2219
import com.uber.cadence.worker.Worker;
23-
import com.uber.cadence.worker.WorkerOptions;
2420

25-
import java.io.IOException;
26-
import java.net.InetAddress;
27-
import java.net.UnknownHostException;
28-
import java.time.Duration;
21+
import java.lang.management.ManagementFactory;
22+
23+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
2924

3025
/**
31-
* This is the process which hosts all workflows and activities in this sample
26+
* This is the process which hosts all workflows and activities in this sample.
27+
* Run multiple instances of the worker in different windows. Then start workflow
28+
* by running FileProcessingStarter. Note that all activities always execute on the same worker.
29+
* But each time they might end up on a different worker as the first activity is dispatched to the common task list.
3230
*/
3331
public class FileProcessingWorker {
3432

3533
static final String TASK_LIST = "FileProcessing";
3634

37-
public static void main(String[] args) throws Exception {
38-
ConfigHelper configHelper = ConfigHelper.createConfig();
39-
WorkflowService.Iface swfService = configHelper.createWorkflowClient();
40-
AmazonS3 s3Client = configHelper.createS3Client();
41-
String domain = configHelper.getDomain();
35+
public static void main(String[] args) {
4236

43-
String localFolder = configHelper.getValueFromConfig(FileProcessingConfigKeys.ACTIVITY_WORKER_LOCALFOLDER);
37+
String hostSpecifiTaskList = ManagementFactory.getRuntimeMXBean().getName();
4438

4539
// Start worker to poll the common task list
46-
final Worker workerForCommonTaskList = new Worker(swfService, domain, TASK_LIST, null);
47-
SimpleStoreActivitiesS3Impl storeActivityImpl = new SimpleStoreActivitiesS3Impl(s3Client, localFolder, getHostName());
40+
final Worker workerForCommonTaskList = new Worker(DOMAIN, TASK_LIST);
41+
workerForCommonTaskList.registerWorkflowImplementationTypes(FileProcessingWorkflowImpl.class);
42+
StoreActivitiesImpl storeActivityImpl = new StoreActivitiesImpl(hostSpecifiTaskList);
4843
workerForCommonTaskList.registerActivitiesImplementations(storeActivityImpl);
49-
workerForCommonTaskList.registerWorkflowImplementationTypes(FileProcessingWorkflowZipImpl.class);
50-
5144
workerForCommonTaskList.start();
52-
System.out.println("Worker tarted for task list: " + TASK_LIST);
45+
System.out.println("Worker started for task list: " + TASK_LIST);
5346

5447
// Start worker to poll the host specific task list
55-
WorkerOptions hostSpecificOptions = new WorkerOptions.Builder().build();
56-
final Worker workerForHostSpecificTaskList = new Worker(swfService, domain, getHostName(), hostSpecificOptions);
57-
FileProcessingActivitiesZipImpl processorActivityImpl = new FileProcessingActivitiesZipImpl(localFolder);
58-
workerForHostSpecificTaskList.registerActivitiesImplementations(storeActivityImpl, processorActivityImpl);
48+
final Worker workerForHostSpecificTaskList = new Worker(DOMAIN, hostSpecifiTaskList);
49+
workerForHostSpecificTaskList.registerActivitiesImplementations(storeActivityImpl);
5950
workerForHostSpecificTaskList.start();
60-
System.out.println("Worker Started for activity and workflow task List: " + getHostName());
61-
62-
Runtime.getRuntime().addShutdownHook(new Thread() {
63-
64-
public void run() {
65-
workerForCommonTaskList.shutdown(Duration.ofSeconds(5));
66-
workerForHostSpecificTaskList.shutdown(Duration.ofSeconds(5));
67-
System.out.println("Activity Workers Exited.");
68-
}
69-
});
70-
71-
System.out.println("Please press any key to terminate service.");
72-
73-
try {
74-
System.in.read();
75-
} catch (IOException e) {
76-
e.printStackTrace();
77-
}
78-
System.exit(0);
79-
51+
System.out.println("Worker Started for activity task List: " + hostSpecifiTaskList);
8052
}
81-
82-
static String getHostName() {
83-
try {
84-
InetAddress addr = InetAddress.getLocalHost();
85-
return addr.getHostName();
86-
} catch (UnknownHostException e) {
87-
throw new Error(e);
88-
}
89-
}
90-
9153
}

src/main/java/com/uber/cadence/samples/fileprocessing/FileProcessingWorkflow.java

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,14 @@
1818

1919
import com.uber.cadence.workflow.WorkflowMethod;
2020

21+
import java.net.URL;
22+
2123
/**
2224
* Contract for file processing workflow
2325
*/
2426
public interface FileProcessingWorkflow {
2527

26-
final class Arguments {
27-
String sourceBucketName;
28-
String sourceFilename;
29-
String targetBucketName;
30-
String targetFilename;
31-
32-
public String getSourceBucketName() {
33-
return sourceBucketName;
34-
}
35-
36-
public void setSourceBucketName(String sourceBucketName) {
37-
this.sourceBucketName = sourceBucketName;
38-
}
39-
40-
public String getSourceFilename() {
41-
return sourceFilename;
42-
}
43-
44-
public void setSourceFilename(String sourceFilename) {
45-
this.sourceFilename = sourceFilename;
46-
}
47-
48-
public String getTargetBucketName() {
49-
return targetBucketName;
50-
}
51-
52-
public void setTargetBucketName(String targetBucketName) {
53-
this.targetBucketName = targetBucketName;
54-
}
55-
56-
public String getTargetFilename() {
57-
return targetFilename;
58-
}
59-
60-
public void setTargetFilename(String targetFilename) {
61-
this.targetFilename = targetFilename;
62-
}
63-
}
64-
/**
65-
* Uses a structure as arguments, to make addition of new arguments a backwards compatible change.
66-
*/
67-
@WorkflowMethod(taskList = FileProcessingStarter.WORKFLOW_TASK_LIST,
68-
executionStartToCloseTimeoutSeconds = 300)
69-
void processFile(Arguments args);
28+
@WorkflowMethod(taskList = FileProcessingWorker.TASK_LIST,
29+
executionStartToCloseTimeoutSeconds = 30)
30+
void processFile(URL source, URL destination);
7031
}

0 commit comments

Comments
 (0)