Skip to content

Commit 126ac2b

Browse files
author
Liang Mei
committed
Add saga example
1 parent d9f4ad7 commit 126ac2b

File tree

2 files changed

+144
-1
lines changed

2 files changed

+144
-1
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ repositories {
3333
}
3434

3535
dependencies {
36-
compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.2.0'
36+
compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.5.2'
3737
compile group: 'commons-configuration', name: 'commons-configuration', version: '1.9'
3838
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
3939
testCompile group: 'junit', name: 'junit', version: '4.12'
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package com.uber.cadence.samples.hello;
2+
3+
import com.uber.cadence.activity.ActivityMethod;
4+
import com.uber.cadence.client.WorkflowClient;
5+
import com.uber.cadence.client.WorkflowOptions;
6+
import com.uber.cadence.worker.Worker;
7+
import com.uber.cadence.workflow.*;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
16+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
17+
18+
/**
19+
* Demonstrates implementing saga transaction and compensation logic using Cadence.
20+
*/
21+
public class HelloSaga {
22+
static final String TASK_LIST = "HelloSaga";
23+
static final List<String> transactions = new ArrayList<>();
24+
25+
public interface ChildWorkflowOperation {
26+
@WorkflowMethod
27+
void execute(int amount);
28+
}
29+
30+
public static class ChildWorkflowOperationImpl implements ChildWorkflowOperation {
31+
Logger log = Workflow.getLogger(ChildWorkflowOperationImpl.class);
32+
33+
public void execute(int amount) {
34+
log.info("ChildWorkflowOperationImpl.execute() is called.");
35+
transactions.add("child workflow execution: " + amount);
36+
}
37+
}
38+
39+
public interface ChildWorkflowCompensation {
40+
@WorkflowMethod
41+
void compensate(int amount);
42+
}
43+
44+
public static class ChildWorkflowCompensationImpl implements ChildWorkflowCompensation {
45+
Logger log = Workflow.getLogger(ChildWorkflowCompensationImpl.class);
46+
47+
public void compensate(int amount) {
48+
log.info("ChildWorkflowCompensationImpl.compensate() is called.");
49+
transactions.add("child workflow compensation: " + amount);
50+
}
51+
}
52+
53+
public interface ActivityOperation {
54+
@ActivityMethod(scheduleToCloseTimeoutSeconds = 2)
55+
void execute(int amount);
56+
}
57+
58+
public static class ActivityOperationImpl implements ActivityOperation {
59+
Logger log = LoggerFactory.getLogger(ActivityOperationImpl.class);
60+
61+
public void execute(int amount) {
62+
log.info("ActivityOperationImpl.execute() is called.");
63+
transactions.add("activity execution: " + amount);
64+
}
65+
}
66+
67+
public interface ActivityCompensation {
68+
@ActivityMethod(scheduleToCloseTimeoutSeconds = 2)
69+
void compensate(int amount);
70+
}
71+
72+
public static class ActivityCompensationImpl implements ActivityCompensation {
73+
Logger log = LoggerFactory.getLogger(ActivityCompensationImpl.class);
74+
75+
public void compensate(int amount) {
76+
log.info("ActivityCompensationImpl.execute() is called.");
77+
transactions.add("activity compensation: " + amount);
78+
}
79+
}
80+
81+
public interface SagaWorkflow {
82+
/**
83+
* Main saga workflow.
84+
*/
85+
@WorkflowMethod
86+
List<String> execute();
87+
}
88+
89+
public static class SagaWorkflowImpl implements SagaWorkflow {
90+
@Override
91+
public List<String> execute() {
92+
Saga saga = new Saga(new Saga.Options.Builder().setParallelCompensation(false).build());
93+
try {
94+
ChildWorkflowOperation op1 = Workflow.newChildWorkflowStub(ChildWorkflowOperation.class);
95+
op1.execute(10);
96+
ChildWorkflowCompensation c1 = Workflow.newChildWorkflowStub(ChildWorkflowCompensation.class);
97+
saga.addCompensation(c1::compensate, -10);
98+
99+
ActivityOperation op2 = Workflow.newActivityStub(ActivityOperation.class);
100+
Promise<Void> result = Async.procedure(op2::execute, 20);
101+
result.get();
102+
ActivityCompensation c2 = Workflow.newActivityStub(ActivityCompensation.class);
103+
saga.addCompensation(c2::compensate, -20);
104+
105+
transactions.add("main workflow: " + 30);
106+
saga.addCompensation(()->transactions.add("main workflow compensation: " + -30));
107+
108+
throw new RuntimeException("some error");
109+
110+
} catch (Exception e) {
111+
saga.compensate();
112+
}
113+
114+
return transactions;
115+
}
116+
}
117+
118+
119+
public static void main(String[] args) {
120+
// Start a worker that hosts the workflow implementation.
121+
Worker.Factory factory = new Worker.Factory(DOMAIN);
122+
Worker worker = factory.newWorker(TASK_LIST);
123+
worker.registerWorkflowImplementationTypes(
124+
HelloSaga.SagaWorkflowImpl.class,
125+
HelloSaga.ChildWorkflowOperationImpl.class,
126+
HelloSaga.ChildWorkflowCompensationImpl.class);
127+
worker.registerActivitiesImplementations(new ActivityOperationImpl(), new ActivityCompensationImpl());
128+
factory.start();
129+
130+
// Start a workflow execution. Usually this is done from another program.
131+
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
132+
// Get a workflow stub using the same task list the worker uses.
133+
WorkflowOptions workflowOptions =
134+
new WorkflowOptions.Builder()
135+
.setTaskList(TASK_LIST)
136+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(30))
137+
.build();
138+
HelloSaga.SagaWorkflow workflow =
139+
workflowClient.newWorkflowStub(HelloSaga.SagaWorkflow.class, workflowOptions);
140+
System.out.println(workflow.execute());
141+
System.exit(0);
142+
}
143+
}

0 commit comments

Comments
 (0)