Skip to content

Commit 225245e

Browse files
authored
Fixed thread exhaustion with large number of parallel async activities (#234)
* Moved thread start to runUntilBlocked to delay grabbing thread from the pool. * Added testLargeHistory unit test * Fixed Retryer when maxAttempts or expiration is not specified
1 parent 3b2b572 commit 225245e

File tree

6 files changed

+78
-17
lines changed

6 files changed

+78
-17
lines changed

src/main/java/com/uber/cadence/internal/common/Retryer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap;
2121

2222
import com.uber.cadence.common.RetryOptions;
23+
import java.time.Duration;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionException;
2526
import java.util.function.BiFunction;
@@ -105,8 +106,10 @@ public static <R, T extends Throwable> R retryWithResult(
105106
}
106107
}
107108
long elapsed = System.currentTimeMillis() - startTime;
108-
if (attempt >= options.getMaximumAttempts()
109-
|| elapsed >= options.getExpiration().toMillis()) {
109+
int maxAttempts = options.getMaximumAttempts();
110+
Duration expiration = options.getExpiration();
111+
if ((maxAttempts > 0 && attempt >= maxAttempts)
112+
|| (expiration != null && elapsed >= expiration.toMillis())) {
110113
rethrow(e);
111114
}
112115
log.warn("Retrying after failure", e);

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class WorkflowExecutionUtils {
9494
new RetryOptions.Builder()
9595
.setBackoffCoefficient(2)
9696
.setInitialInterval(Duration.ofMillis(500))
97-
.setMaximumInterval(Duration.ofSeconds(30))
97+
.setMaximumInterval(Duration.ofSeconds(10))
9898
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
9999
.build();
100100

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ public void runUntilAllBlocked() throws Throwable {
211211
// Otherwise signal might be never processed if it was received
212212
// after workflow decided to close.
213213
threads.addFirst(thread);
214-
thread.start();
215214
}
216215
toExecuteInWorkflowThread.clear();
217216
progress = false;

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ class WorkflowThreadContext {
4949
}
5050

5151
public void initialYield() {
52-
if (getStatus() != Status.RUNNING) {
53-
throw new IllegalStateException("not in RUNNING but in " + getStatus() + " state");
52+
Status status = getStatus();
53+
if (status == Status.DONE) {
54+
throw new DestroyWorkflowThreadError("done in initialYield");
55+
}
56+
if (status != Status.RUNNING) {
57+
throw new IllegalStateException("not in RUNNING but in " + status + " state");
5458
}
5559
yield("created", () -> true);
5660
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,7 @@ private void setBlockedUntil(long blockedUntil) {
300300
@Override
301301
public boolean runUntilBlocked() {
302302
if (taskFuture == null) {
303-
// Thread is not yet started
304-
return false;
303+
start();
305304
}
306305
return context.runUntilBlocked();
307306
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.uber.cadence.common.RetryOptions;
4646
import com.uber.cadence.converter.JsonDataConverter;
4747
import com.uber.cadence.internal.sync.DeterministicRunnerTest;
48+
import com.uber.cadence.internal.worker.PollerOptions;
4849
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
4950
import com.uber.cadence.testing.TestEnvironmentOptions;
5051
import com.uber.cadence.testing.TestWorkflowEnvironment;
@@ -78,6 +79,7 @@
7879
import org.junit.After;
7980
import org.junit.Assert;
8081
import org.junit.Before;
82+
import org.junit.Ignore;
8183
import org.junit.Rule;
8284
import org.junit.Test;
8385
import org.junit.rules.TestName;
@@ -184,21 +186,21 @@ private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList
184186

185187
private static ActivityOptions newActivityOptions1(String taskList) {
186188
if (DEBUGGER_TIMEOUTS) {
187-
return new ActivityOptions.Builder()
188-
.setTaskList(taskList)
189-
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
190-
.setHeartbeatTimeout(Duration.ofSeconds(5))
191-
.setScheduleToStartTimeout(Duration.ofSeconds(5))
192-
.setStartToCloseTimeout(Duration.ofSeconds(10))
193-
.build();
194-
} else {
195189
return new ActivityOptions.Builder()
196190
.setTaskList(taskList)
197191
.setScheduleToCloseTimeout(Duration.ofSeconds(1000))
198192
.setHeartbeatTimeout(Duration.ofSeconds(1000))
199193
.setScheduleToStartTimeout(Duration.ofSeconds(1000))
200194
.setStartToCloseTimeout(Duration.ofSeconds(10000))
201195
.build();
196+
} else {
197+
return new ActivityOptions.Builder()
198+
.setTaskList(taskList)
199+
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
200+
.setHeartbeatTimeout(Duration.ofSeconds(5))
201+
.setScheduleToStartTimeout(Duration.ofSeconds(5))
202+
.setStartToCloseTimeout(Duration.ofSeconds(10))
203+
.build();
202204
}
203205
}
204206

@@ -223,7 +225,11 @@ public void setUp() {
223225
.build();
224226
workerFactory = new Worker.Factory(service, DOMAIN, factoryOptions);
225227
WorkerOptions workerOptions =
226-
new WorkerOptions.Builder().setInterceptorFactory(tracer).build();
228+
new WorkerOptions.Builder()
229+
.setActivityPollerOptions(new PollerOptions.Builder().setPollThreadCount(5).build())
230+
.setMaxConcurrentActivityExecutionSize(1000)
231+
.setInterceptorFactory(tracer)
232+
.build();
227233
worker = workerFactory.newWorker(taskList, workerOptions);
228234
workflowClient = WorkflowClient.newInstance(service, DOMAIN);
229235
WorkflowClientOptions clientOptions =
@@ -3645,6 +3651,56 @@ public void testGenericParametersWorkflow() throws ExecutionException, Interrupt
36453651
assertEquals(expectedResult, result);
36463652
}
36473653

3654+
public interface TestLargeWorkflow {
3655+
3656+
@WorkflowMethod
3657+
String execute(int activityCount, String taskList);
3658+
}
3659+
3660+
public interface TestLargeWorkflowActivity {
3661+
String activity();
3662+
}
3663+
3664+
public static class TestLargeWorkflowActivityImpl implements TestLargeWorkflowActivity {
3665+
3666+
@Override
3667+
public String activity() {
3668+
return "done";
3669+
}
3670+
}
3671+
3672+
public static class TestLargeHistory implements TestLargeWorkflow {
3673+
3674+
@Override
3675+
public String execute(int activityCount, String taskList) {
3676+
TestLargeWorkflowActivity activities =
3677+
Workflow.newActivityStub(TestLargeWorkflowActivity.class, newActivityOptions1(taskList));
3678+
List<Promise<String>> results = new ArrayList<>();
3679+
for (int i = 0; i < activityCount; i++) {
3680+
Promise<String> result = Async.function(activities::activity);
3681+
results.add(result);
3682+
}
3683+
Promise.allOf(results).get();
3684+
return "done";
3685+
}
3686+
}
3687+
3688+
@Test
3689+
@Ignore // Requires DEBUG_TIMEOUTS=true
3690+
public void testLargeHistory() {
3691+
final int activityCount = 1000;
3692+
worker.registerActivitiesImplementations(new TestLargeWorkflowActivityImpl());
3693+
startWorkerFor(TestLargeHistory.class);
3694+
TestLargeWorkflow workflowStub =
3695+
workflowClient.newWorkflowStub(
3696+
TestLargeWorkflow.class, newWorkflowOptionsBuilder(taskList).build());
3697+
long start = System.currentTimeMillis();
3698+
String result = workflowStub.execute(activityCount, taskList);
3699+
long duration = System.currentTimeMillis() - start;
3700+
log.info(testName.toString() + " duration is " + duration);
3701+
assertEquals("done", result);
3702+
}
3703+
36483704
private static class FilteredTrace {
36493705

36503706
private final List<String> impl = Collections.synchronizedList(new ArrayList<>());

0 commit comments

Comments
 (0)