27
27
import com .uber .cadence .workflow .Workflow ;
28
28
import com .uber .cadence .workflow .WorkflowMethod ;
29
29
import java .time .Duration ;
30
+ import java .util .ArrayList ;
31
+ import java .util .List ;
32
+ import java .util .concurrent .ArrayBlockingQueue ;
33
+ import java .util .concurrent .BlockingDeque ;
34
+ import java .util .concurrent .BlockingQueue ;
35
+ import java .util .concurrent .CompletableFuture ;
30
36
31
37
/**
32
38
* Demonstrates asynchronous signalling of a workflow. Requires a local instance of Cadence server
@@ -39,32 +45,53 @@ public class HelloSignal {
39
45
40
46
/** Workflow interface must have a method annotated with @WorkflowMethod. */
41
47
public interface GreetingWorkflow {
42
- /** @return greeting string */
48
+ /** @return list of greeting strings that were received through the
49
+ * waitForNameMethod. This method will block until the number of greetings
50
+ * specified are received. */
43
51
@ WorkflowMethod
44
- String getGreeting ();
52
+ List < String > getGreetings ();
45
53
46
54
/** Receives name through an external signal. */
47
55
@ SignalMethod
48
56
void waitForName (String name );
57
+
58
+ /** Receives name through an external signal. */
59
+ @ SignalMethod
60
+ void exit ();
49
61
}
50
62
51
63
/** GreetingWorkflow implementation that returns a greeting. */
52
64
public static class GreetingWorkflowImpl implements GreetingWorkflow {
53
65
54
- private final CompletablePromise <String > name = Workflow .newPromise ();
66
+ List <String > messageQueue = new ArrayList <>(10 );
67
+ boolean exit = false ;
55
68
56
69
@ Override
57
- public String getGreeting () {
58
- return "Hello " + name .get () + "!" ;
70
+ public List <String > getGreetings () {
71
+ List <String > receivedMessages = new ArrayList <>(10 );
72
+
73
+ while (true ) {
74
+ Workflow .await (() -> !messageQueue .isEmpty () || exit );
75
+ if (messageQueue .isEmpty () && exit ){
76
+ return receivedMessages ;
77
+ }
78
+ String message = messageQueue .remove (0 );
79
+ receivedMessages .add (message );
80
+ }
59
81
}
60
82
61
83
@ Override
62
84
public void waitForName (String name ) {
63
- this .name .complete (name );
85
+ messageQueue .add ("Hello " + name + "!" );
86
+ }
87
+
88
+ @ Override
89
+ public void exit () {
90
+ exit = true ;
64
91
}
65
92
}
66
93
67
- public static void main (String [] args ) {
94
+ public static void main (String [] args ) throws Exception {
68
95
// Start a worker that hosts the workflow implementation.
69
96
Worker worker = new Worker (DOMAIN , TASK_LIST );
70
97
worker .registerWorkflowImplementationTypes (GreetingWorkflowImpl .class );
@@ -80,17 +107,21 @@ public static void main(String[] args) {
80
107
.build ();
81
108
GreetingWorkflow workflow =
82
109
workflowClient .newWorkflowStub (GreetingWorkflow .class , workflowOptions );
110
+ workflow .wait ();
83
111
// Start workflow asynchronously to not use another thread to signal.
84
- WorkflowClient .start (workflow ::getGreeting );
112
+ WorkflowClient .start (workflow ::getGreetings );
85
113
// After start for getGreeting returns, the workflow is guaranteed to be started.
86
114
// So we can send a signal to it using workflow stub.
115
+ // This workflow keeps receiving signals until exit is called
87
116
workflow .waitForName ("World" );
117
+ workflow .waitForName ("Universe" );
118
+ workflow .exit ();
88
119
// Calling synchronous getGreeting after workflow has started reconnects to the existing
89
120
// workflow and blocks until a result is available. Note that this behavior assumes that
90
121
// WorkflowOptions are not configured with WorkflowIdReusePolicy.AllowDuplicate. In that case
91
122
// the call would fail with WorkflowExecutionAlreadyStartedException.
92
- String greeting = workflow .getGreeting ();
93
- System .out .println (greeting );
123
+ List < String > greetings = workflow .getGreetings ();
124
+ System .out .println (greetings );
94
125
System .exit (0 );
95
126
}
96
127
}
0 commit comments