Skip to content

Commit 82d366f

Browse files
authored
Make sure signals are applied in the same order as they appear in history (#318)
* Make sure signals in the same decision are applied in the same order as they appear in history * Support suspend/resume polling methods in worker factory
1 parent d09fb23 commit 82d366f

File tree

4 files changed

+160
-15
lines changed

4 files changed

+160
-15
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,23 +202,34 @@ public void runUntilAllBlocked() throws Throwable {
202202
outerLoop:
203203
do {
204204
threadsToAdd.clear();
205-
for (NamedRunnable nr : toExecuteInWorkflowThread) {
206-
WorkflowThread thread =
207-
new WorkflowThreadImpl(
208-
false,
209-
threadPool,
210-
this,
211-
nr.name,
212-
false,
213-
runnerCancellationScope,
214-
nr.runnable,
215-
cache);
205+
206+
if (!toExecuteInWorkflowThread.isEmpty()) {
207+
List<WorkflowThread> callbackThreads = new ArrayList<>(toExecuteInWorkflowThread.size());
208+
for (NamedRunnable nr : toExecuteInWorkflowThread) {
209+
WorkflowThread thread =
210+
new WorkflowThreadImpl(
211+
false,
212+
threadPool,
213+
this,
214+
nr.name,
215+
false,
216+
runnerCancellationScope,
217+
nr.runnable,
218+
cache);
219+
callbackThreads.add(thread);
220+
}
221+
216222
// It is important to prepend threads as there are callbacks
217223
// like signals that have to run before any other threads.
218224
// Otherwise signal might be never processed if it was received
219225
// after workflow decided to close.
220-
threads.addFirst(thread);
226+
// Adding the callbacks in the same order as they appear in history.
227+
228+
for (int i = callbackThreads.size() - 1; i >= 0; i--) {
229+
threads.addFirst(callbackThreads.get(i));
230+
}
221231
}
232+
222233
toExecuteInWorkflowThread.clear();
223234
progress = false;
224235
Iterator<WorkflowThread> ci = threads.iterator();

src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919

2020
import com.uber.cadence.internal.common.InternalUtils;
2121
import com.uber.cadence.internal.worker.ActivityWorker;
22-
import com.uber.cadence.internal.worker.Lifecycle;
2322
import com.uber.cadence.internal.worker.SingleWorkerOptions;
23+
import com.uber.cadence.internal.worker.SuspendableWorker;
2424
import com.uber.cadence.serviceclient.IWorkflowService;
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.ScheduledExecutorService;
2727
import java.util.concurrent.TimeUnit;
2828

2929
/** Activity worker that supports POJO activity implementations. */
30-
public class SyncActivityWorker implements Lifecycle {
30+
public class SyncActivityWorker implements SuspendableWorker {
3131

3232
private final ActivityWorker worker;
3333
private final POJOActivityTaskHandler taskHandler;
@@ -83,11 +83,18 @@ public void awaitTermination(long timeout, TimeUnit unit) {
8383
InternalUtils.awaitTermination(heartbeatExecutor, timeoutMillis);
8484
}
8585

86+
@Override
8687
public void suspendPolling() {
8788
worker.suspendPolling();
8889
}
8990

91+
@Override
9092
public void resumePolling() {
9193
worker.resumePolling();
9294
}
95+
96+
@Override
97+
public boolean isSuspended() {
98+
return worker.isSuspended();
99+
}
93100
}

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.uber.cadence.internal.worker.Poller;
3636
import com.uber.cadence.internal.worker.PollerOptions;
3737
import com.uber.cadence.internal.worker.SingleWorkerOptions;
38+
import com.uber.cadence.internal.worker.Suspendable;
3839
import com.uber.cadence.internal.worker.WorkflowPollTaskFactory;
3940
import com.uber.cadence.serviceclient.IWorkflowService;
4041
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
@@ -63,7 +64,7 @@
6364
* Hosts activity and workflow implementations. Uses long poll to receive activity and decision
6465
* tasks and processes them in a correspondent thread pool.
6566
*/
66-
public final class Worker {
67+
public final class Worker implements Suspendable {
6768

6869
private final WorkerOptions options;
6970
private final String taskList;
@@ -364,6 +365,43 @@ public String getTaskList() {
364365
return taskList;
365366
}
366367

368+
@Override
369+
public void suspendPolling() {
370+
if (workflowWorker != null) {
371+
workflowWorker.suspendPolling();
372+
}
373+
374+
if (activityWorker != null) {
375+
activityWorker.suspendPolling();
376+
}
377+
}
378+
379+
@Override
380+
public void resumePolling() {
381+
if (workflowWorker != null) {
382+
workflowWorker.resumePolling();
383+
}
384+
385+
if (activityWorker != null) {
386+
activityWorker.resumePolling();
387+
}
388+
}
389+
390+
@Override
391+
public boolean isSuspended() {
392+
boolean workflowWorkerSuspended = true;
393+
if (workflowWorker != null) {
394+
workflowWorkerSuspended = workflowWorker.isSuspended();
395+
}
396+
397+
boolean activityWorkerSuspended = activityWorker.isSuspended();
398+
if (activityWorker != null) {
399+
activityWorker.resumePolling();
400+
}
401+
402+
return workflowWorkerSuspended && activityWorkerSuspended;
403+
}
404+
367405
/** Maintains worker creation and lifecycle. */
368406
public static final class Factory {
369407
private final List<Worker> workers = new ArrayList<>();
@@ -712,9 +750,40 @@ private String getStickyTaskListName() {
712750
: String.format("%s:%s", getHostName(), id);
713751
}
714752

753+
public synchronized void suspendPolling() {
754+
if (state != State.Started) {
755+
return;
756+
}
757+
758+
log.info("suspendPolling");
759+
state = State.Suspended;
760+
if (stickyPoller != null) {
761+
stickyPoller.suspendPolling();
762+
}
763+
for (Worker worker : workers) {
764+
worker.suspendPolling();
765+
}
766+
}
767+
768+
public synchronized void resumePolling() {
769+
if (state != State.Suspended) {
770+
return;
771+
}
772+
773+
log.info("resumePolling");
774+
state = State.Started;
775+
if (stickyPoller != null) {
776+
stickyPoller.resumePolling();
777+
}
778+
for (Worker worker : workers) {
779+
worker.resumePolling();
780+
}
781+
}
782+
715783
enum State {
716784
Initial,
717785
Started,
786+
Suspended,
718787
Shutdown
719788
}
720789
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4569,6 +4569,64 @@ public void testParallelLocalActivityExecutionWorkflow() {
45694569
result);
45704570
}
45714571

4572+
public interface SignalOrderingWorkflow {
4573+
@WorkflowMethod
4574+
List<String> run();
4575+
4576+
@SignalMethod(name = "testSignal")
4577+
void signal(String s);
4578+
}
4579+
4580+
public static class SignalOrderingWorkflowImpl implements SignalOrderingWorkflow {
4581+
private List<String> signals = new ArrayList<>();
4582+
4583+
@Override
4584+
public List<String> run() {
4585+
Workflow.await(() -> signals.size() == 3);
4586+
return signals;
4587+
}
4588+
4589+
@Override
4590+
public void signal(String s) {
4591+
signals.add(s);
4592+
}
4593+
}
4594+
4595+
@Test
4596+
public void testSignalOrderingWorkflow() {
4597+
startWorkerFor(SignalOrderingWorkflowImpl.class);
4598+
WorkflowOptions options =
4599+
new WorkflowOptions.Builder()
4600+
.setExecutionStartToCloseTimeout(Duration.ofMinutes(1))
4601+
.setTaskStartToCloseTimeout(Duration.ofSeconds(10))
4602+
.setTaskList(taskList)
4603+
.build();
4604+
SignalOrderingWorkflow workflowStub =
4605+
workflowClient.newWorkflowStub(SignalOrderingWorkflow.class, options);
4606+
WorkflowClient.start(workflowStub::run);
4607+
4608+
// Suspend polling so that all the signals will be received in the same decision task.
4609+
if (useExternalService) {
4610+
workerFactory.suspendPolling();
4611+
} else {
4612+
testEnvironment.getWorkerFactory().suspendPolling();
4613+
}
4614+
4615+
workflowStub.signal("test1");
4616+
workflowStub.signal("test2");
4617+
workflowStub.signal("test3");
4618+
4619+
if (useExternalService) {
4620+
workerFactory.resumePolling();
4621+
} else {
4622+
testEnvironment.getWorkerFactory().resumePolling();
4623+
}
4624+
4625+
List<String> result = workflowStub.run();
4626+
List<String> expected = Arrays.asList("test1", "test2", "test3");
4627+
assertEquals(expected, result);
4628+
}
4629+
45724630
private static class FilteredTrace {
45734631

45744632
private final List<String> impl = Collections.synchronizedList(new ArrayList<>());

0 commit comments

Comments
 (0)