Skip to content

Commit b696a14

Browse files
authored
Lock decider while processing (#325)
* Lock decider while processing * Start decision task should not look at sticky schedule to start timeout * Move workflow run lock into workflow worker * Add unit test for local activity heartbeating and query
1 parent f47812c commit b696a14

File tree

7 files changed

+358
-14
lines changed

7 files changed

+358
-14
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,11 @@ private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.P
405405
}
406406

407407
forceCreateNewDecisionTask =
408-
processEventLoop(startTime, startedEvent.getTaskStartToCloseTimeoutSeconds(), decision);
408+
processEventLoop(
409+
startTime,
410+
startedEvent.getTaskStartToCloseTimeoutSeconds(),
411+
decision,
412+
decisionTask.getQuery() != null);
409413

410414
mayBeCompleteWorkflow();
411415
if (decision.isReplay()) {
@@ -443,11 +447,12 @@ private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.P
443447
}
444448
}
445449

446-
private boolean processEventLoop(long startTime, int decisionTimeoutSecs, DecisionEvents decision)
450+
private boolean processEventLoop(
451+
long startTime, int decisionTimeoutSecs, DecisionEvents decision, boolean isQuery)
447452
throws Throwable {
448453
eventLoop();
449454

450-
if (decision.isReplay()) {
455+
if (decision.isReplay() || isQuery) {
451456
return replayLocalActivities(decision);
452457
} else {
453458
return executeLocalActivities(startTime, decisionTimeoutSecs);

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ public SyncWorkflowWorker(
8888
service,
8989
laWorker.getLocalActivityTaskPoller());
9090

91-
workflowWorker = new WorkflowWorker(service, domain, taskList, workflowOptions, taskHandler);
91+
workflowWorker =
92+
new WorkflowWorker(
93+
service, domain, taskList, workflowOptions, taskHandler, stickyTaskListName);
9294
}
9395

9496
public void setWorkflowImplementationTypes(

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,7 @@ public void startDecisionTask(
216216
long scheduledEventId = decision.getData().scheduledEventId;
217217
decision.action(StateMachines.Action.START, ctx, pollRequest, 0);
218218
ctx.addTimer(
219-
stickyExecutionAttributes != null
220-
? stickyExecutionAttributes.getScheduleToStartTimeoutSeconds()
221-
: startRequest.getTaskStartToCloseTimeoutSeconds(),
219+
startRequest.getTaskStartToCloseTimeoutSeconds(),
222220
() -> timeoutDecisionTask(scheduledEventId),
223221
"DecisionTask StartToCloseTimeout");
224222
});
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.worker;
19+
20+
import java.util.HashMap;
21+
import java.util.concurrent.locks.Lock;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
24+
final class WorkflowRunLockManager {
25+
26+
private static class CountableLock {
27+
private final Lock lock = new ReentrantLock();
28+
private int count = 1;
29+
30+
void incrementCount() {
31+
count++;
32+
}
33+
34+
void decrementCount() {
35+
count--;
36+
}
37+
38+
int getCount() {
39+
return count;
40+
}
41+
42+
Lock getLock() {
43+
return lock;
44+
}
45+
}
46+
47+
private final Lock mapLock = new ReentrantLock();
48+
private final HashMap<String, CountableLock> perRunLock = new HashMap<>();
49+
50+
/**
51+
* This method returns a lock that can be used to serialize decision task processing for a
52+
* particular workflow run. This is used to make sure that query tasks and real decision tasks are
53+
* serialized when sticky is on.
54+
*
55+
* @param runId
56+
* @return a lock to be used during decision task processing
57+
*/
58+
Lock getLockForLocking(String runId) {
59+
mapLock.lock();
60+
61+
try {
62+
CountableLock cl = perRunLock.get(runId);
63+
if (cl == null) {
64+
cl = new CountableLock();
65+
perRunLock.put(runId, cl);
66+
} else {
67+
cl.incrementCount();
68+
}
69+
70+
return cl.getLock();
71+
} finally {
72+
mapLock.unlock();
73+
}
74+
}
75+
76+
void unlock(String runId) {
77+
mapLock.lock();
78+
79+
try {
80+
CountableLock cl = perRunLock.get(runId);
81+
if (cl == null) {
82+
throw new RuntimeException("lock for run " + runId + " does not exist.");
83+
}
84+
85+
cl.decrementCount();
86+
if (cl.getCount() == 0) {
87+
perRunLock.remove(runId);
88+
}
89+
90+
cl.getLock().unlock();
91+
} finally {
92+
mapLock.unlock();
93+
}
94+
}
95+
96+
int totalLocks() {
97+
mapLock.lock();
98+
99+
try {
100+
return perRunLock.size();
101+
} finally {
102+
mapLock.unlock();
103+
}
104+
}
105+
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20+
import com.google.common.base.Strings;
2021
import com.uber.cadence.BadRequestError;
2122
import com.uber.cadence.DomainNotActiveError;
2223
import com.uber.cadence.EntityNotExistsError;
@@ -46,6 +47,7 @@
4647
import java.util.List;
4748
import java.util.Objects;
4849
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.locks.Lock;
4951
import java.util.function.Consumer;
5052
import org.apache.thrift.TException;
5153
import org.slf4j.MDC;
@@ -62,17 +64,21 @@ public final class WorkflowWorker
6264
private final String domain;
6365
private final String taskList;
6466
private final SingleWorkerOptions options;
67+
private final String stickyTaskListName;
68+
private final WorkflowRunLockManager runLocks = new WorkflowRunLockManager();
6569

6670
public WorkflowWorker(
6771
IWorkflowService service,
6872
String domain,
6973
String taskList,
7074
SingleWorkerOptions options,
71-
DecisionTaskHandler handler) {
75+
DecisionTaskHandler handler,
76+
String stickyTaskListName) {
7277
this.service = Objects.requireNonNull(service);
7378
this.domain = Objects.requireNonNull(domain);
7479
this.taskList = Objects.requireNonNull(taskList);
7580
this.handler = handler;
81+
this.stickyTaskListName = stickyTaskListName;
7682

7783
PollerOptions pollerOptions = options.getPollerOptions();
7884
if (pollerOptions.getPollThreadNamePrefix() == null) {
@@ -240,6 +246,13 @@ public void handle(PollForDecisionTaskResponse task) throws Exception {
240246
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
241247
MDC.put(LoggerTag.WORKFLOW_TYPE, task.getWorkflowType().getName());
242248
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
249+
250+
Lock runLock = null;
251+
if (!Strings.isNullOrEmpty(stickyTaskListName)) {
252+
runLock = runLocks.getLockForLocking(task.getWorkflowExecution().getRunId());
253+
runLock.lock();
254+
}
255+
243256
try {
244257
Stopwatch sw = metricsScope.timer(MetricsType.DECISION_EXECUTION_LATENCY).start();
245258
DecisionTaskHandler.Result response = handler.handleDecisionTask(task);
@@ -254,6 +267,10 @@ public void handle(PollForDecisionTaskResponse task) throws Exception {
254267
MDC.remove(LoggerTag.WORKFLOW_ID);
255268
MDC.remove(LoggerTag.WORKFLOW_TYPE);
256269
MDC.remove(LoggerTag.RUN_ID);
270+
271+
if (runLock != null) {
272+
runLocks.unlock(task.getWorkflowExecution().getRunId());
273+
}
257274
}
258275
}
259276

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.worker;
19+
20+
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertEquals;
22+
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.locks.Lock;
29+
import org.junit.Test;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
public class WorkflowRunLockManagerTest {
34+
private static final Logger log = LoggerFactory.getLogger(WorkflowRunLockManagerTest.class);
35+
private WorkflowRunLockManager runLockManager = new WorkflowRunLockManager();
36+
37+
@Test
38+
public void lockAndUnlockTest() throws ExecutionException, InterruptedException {
39+
ExecutorService executor = Executors.newFixedThreadPool(4);
40+
ConcurrentLinkedQueue<String> finishedTasks = new ConcurrentLinkedQueue<>();
41+
Future<?> f1 = executor.submit(() -> finishedTasks.add(processTask("run1", 1)));
42+
Thread.sleep(100);
43+
Future<?> f3 = executor.submit(() -> finishedTasks.add(processTask("run1", 2)));
44+
Future<?> f2 = executor.submit(() -> finishedTasks.add(processTask("run2", 1)));
45+
Thread.sleep(100);
46+
Future<?> f4 = executor.submit(() -> finishedTasks.add(processTask("run1", 3)));
47+
48+
f1.get();
49+
f2.get();
50+
f3.get();
51+
f4.get();
52+
53+
log.info("All done.");
54+
assertEquals(0, runLockManager.totalLocks());
55+
String[] expectedTasks = {"run1.1", "run2.1", "run1.2", "run1.3"};
56+
String[] processedTasks = new String[4];
57+
assertArrayEquals(expectedTasks, finishedTasks.toArray(processedTasks));
58+
}
59+
60+
private String processTask(String runId, int taskId) {
61+
Lock runLock = runLockManager.getLockForLocking(runId);
62+
runLock.lock();
63+
64+
log.info("Got lock runId " + runId + " taskId " + taskId);
65+
try {
66+
Thread.sleep(1000);
67+
} catch (InterruptedException e) {
68+
throw new RuntimeException("interrupted");
69+
} finally {
70+
runLockManager.unlock(runId);
71+
}
72+
log.info("Finished processing runId " + runId + " taskId " + taskId);
73+
return runId + "." + taskId;
74+
}
75+
}

0 commit comments

Comments
 (0)