Skip to content

Commit e503a2c

Browse files
authored
Fix local activity completion synchronization (#474)
1 parent 722d8cf commit e503a2c

File tree

2 files changed

+57
-31
lines changed

2 files changed

+57
-31
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.concurrent.CancellationException;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.locks.Condition;
42-
import java.util.concurrent.locks.Lock;
43-
import java.util.concurrent.locks.ReentrantLock;
4442
import java.util.function.BiConsumer;
4543
import java.util.function.BiFunction;
4644
import java.util.function.Consumer;
@@ -88,8 +86,7 @@ public void accept(Exception reason) {
8886
private final Map<String, ExecuteLocalActivityParameters> unstartedLaTasks = new HashMap<>();
8987
private final ReplayDecider replayDecider;
9088
private final DataConverter dataConverter;
91-
private final Lock laTaskLock = new ReentrantLock();
92-
private final Condition taskCondition = laTaskLock.newCondition();
89+
private final Condition taskCondition;
9390
private boolean taskCompleted = false;
9491

9592
ClockDecisionContext(
@@ -98,6 +95,7 @@ public void accept(Exception reason) {
9895
ReplayDecider replayDecider,
9996
DataConverter dataConverter) {
10097
this.decisions = decisions;
98+
this.taskCondition = replayDecider.getLock().newCondition();
10199
mutableSideEffectHandler =
102100
new MarkerHandler(decisions, MUTABLE_SIDE_EFFECT_MARKER_NAME, () -> replaying);
103101
versionHandler = new MarkerHandler(decisions, VERSION_MARKER_NAME, () -> replaying);
@@ -272,13 +270,9 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
272270
completionHandle.accept(marker.getResult(), failure);
273271
setReplayCurrentTimeMilliseconds(marker.getReplayTimeMillis());
274272

275-
laTaskLock.lock();
276-
try {
277-
taskCompleted = true;
278-
taskCondition.signal();
279-
} finally {
280-
laTaskLock.unlock();
281-
}
273+
taskCompleted = true;
274+
// This method is already called under the lock.
275+
taskCondition.signal();
282276
}
283277
}
284278

@@ -342,7 +336,7 @@ boolean startUnstartedLaTasks(Duration maxWaitAllowed) {
342336
laTaskPoller.apply(
343337
new LocalActivityWorker.Task(
344338
params,
345-
replayDecider,
339+
replayDecider.getLocalActivityCompletionSink(),
346340
replayDecider.getDecisionTimeoutSeconds(),
347341
this::currentTimeMillis,
348342
this::replayTimeUpdatedAtMillis),
@@ -360,14 +354,10 @@ int numPendingLaTasks() {
360354
}
361355

362356
void awaitTaskCompletion(Duration duration) throws InterruptedException {
363-
laTaskLock.lock();
364-
try {
365-
while (!taskCompleted) {
366-
taskCondition.awaitNanos(duration.toNanos());
367-
}
368-
taskCompleted = false;
369-
} finally {
370-
laTaskLock.unlock();
357+
while (!taskCompleted) {
358+
// This call is called from already locked object
359+
taskCondition.awaitNanos(duration.toNanos());
371360
}
361+
taskCompleted = false;
372362
}
373363
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.util.concurrent.CancellationException;
5252
import java.util.concurrent.TimeUnit;
5353
import java.util.concurrent.atomic.AtomicReference;
54+
import java.util.concurrent.locks.Lock;
55+
import java.util.concurrent.locks.ReentrantLock;
5456
import java.util.function.BiFunction;
5557
import java.util.function.Consumer;
5658
import org.apache.thrift.TException;
@@ -59,7 +61,7 @@
5961
* Implements decider that relies on replay of a workflow code. An instance of this class is created
6062
* per decision.
6163
*/
62-
class ReplayDecider implements Decider, Consumer<HistoryEvent> {
64+
class ReplayDecider implements Decider {
6365

6466
private static final int MAXIMUM_PAGE_SIZE = 10000;
6567

@@ -75,6 +77,8 @@ class ReplayDecider implements Decider, Consumer<HistoryEvent> {
7577
private final Scope metricsScope;
7678
private final long wfStartTimeNanos;
7779
private final WorkflowExecutionStartedEventAttributes startedEvent;
80+
private final Lock lock = new ReentrantLock();
81+
private final Consumer<HistoryEvent> localActivityCompletionSink;
7882

7983
ReplayDecider(
8084
IWorkflowService service,
@@ -100,6 +104,19 @@ class ReplayDecider implements Decider, Consumer<HistoryEvent> {
100104
context =
101105
new DecisionContextImpl(
102106
decisionsHelper, domain, decisionTask, startedEvent, options, laTaskPoller, this);
107+
localActivityCompletionSink =
108+
historyEvent -> {
109+
lock.lock();
110+
try {
111+
processEvent(historyEvent);
112+
} finally {
113+
lock.unlock();
114+
}
115+
};
116+
}
117+
118+
Lock getLock() {
119+
return lock;
103120
}
104121

105122
private void handleWorkflowExecutionStarted(HistoryEvent event) {
@@ -356,8 +373,13 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) {
356373

357374
@Override
358375
public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
359-
boolean forceCreateNewDecisionTask = decideImpl(decisionTask, null);
360-
return new DecisionResult(decisionsHelper.getDecisions(), forceCreateNewDecisionTask);
376+
lock.lock();
377+
try {
378+
boolean forceCreateNewDecisionTask = decideImpl(decisionTask, null);
379+
return new DecisionResult(decisionsHelper.getDecisions(), forceCreateNewDecisionTask);
380+
} finally {
381+
lock.unlock();
382+
}
361383
}
362384

363385
// Returns boolean to indicate whether we need to force create new decision task for local
@@ -536,19 +558,28 @@ int getDecisionTimeoutSeconds() {
536558

537559
@Override
538560
public void close() {
539-
workflow.close();
561+
lock.lock();
562+
try {
563+
workflow.close();
564+
} finally {
565+
lock.unlock();
566+
}
540567
}
541568

542569
@Override
543570
public byte[] query(PollForDecisionTaskResponse response, WorkflowQuery query) throws Throwable {
544-
AtomicReference<byte[]> result = new AtomicReference<>();
545-
decideImpl(response, () -> result.set(workflow.query(query)));
546-
return result.get();
571+
lock.lock();
572+
try {
573+
AtomicReference<byte[]> result = new AtomicReference<>();
574+
decideImpl(response, () -> result.set(workflow.query(query)));
575+
return result.get();
576+
} finally {
577+
lock.unlock();
578+
}
547579
}
548580

549-
@Override
550-
public void accept(HistoryEvent event) {
551-
processEvent(event);
581+
public Consumer<HistoryEvent> getLocalActivityCompletionSink() {
582+
return localActivityCompletionSink;
552583
}
553584

554585
private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
@@ -580,7 +611,12 @@ private final Duration retryServiceOperationExpirationInterval() {
580611

581612
@Override
582613
public PollForDecisionTaskResponse getDecisionTask() {
583-
return task;
614+
lock.lock();
615+
try {
616+
return task;
617+
} finally {
618+
lock.unlock();
619+
}
584620
}
585621

586622
@Override

0 commit comments

Comments
 (0)