Skip to content

Commit caa1f5b

Browse files
authored
Cleanup threads created in signal method on decider close (#405)
1 parent d798e6d commit caa1f5b

File tree

3 files changed

+73
-1
lines changed

3 files changed

+73
-1
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,17 @@
1919

2020
import static com.uber.cadence.internal.common.InternalUtils.createStickyTaskList;
2121

22-
import com.uber.cadence.*;
22+
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
23+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
24+
import com.uber.cadence.HistoryEvent;
25+
import com.uber.cadence.PollForDecisionTaskResponse;
26+
import com.uber.cadence.QueryTaskCompletedType;
27+
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
28+
import com.uber.cadence.RespondDecisionTaskFailedRequest;
29+
import com.uber.cadence.RespondQueryTaskCompletedRequest;
30+
import com.uber.cadence.StickyExecutionAttributes;
31+
import com.uber.cadence.WorkflowExecution;
32+
import com.uber.cadence.WorkflowType;
2333
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2434
import com.uber.cadence.internal.metrics.MetricsType;
2535
import com.uber.cadence.internal.worker.DecisionTaskHandler;

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ public void close() {
320320
return;
321321
}
322322
try {
323+
for (WorkflowThread c : threadsToAdd) {
324+
threads.addLast(c);
325+
}
326+
threadsToAdd.clear();
327+
323328
for (WorkflowThread c : threads) {
324329
threadFutures.add(c.stopNow());
325330
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5194,6 +5194,63 @@ public void testSagaParallelCompensation() {
51945194
assertTrue(tracer.getTrace().contains("executeActivity TestActivities::activity2"));
51955195
}
51965196

5197+
public static class TestSignalExceptionWorkflowImpl implements TestWorkflowSignaled {
5198+
private boolean signaled = false;
5199+
5200+
@Override
5201+
public String execute() {
5202+
Workflow.await(() -> signaled);
5203+
return null;
5204+
}
5205+
5206+
@Override
5207+
public void signal1(String arg) {
5208+
for (int i = 0; i < 100; i++) {
5209+
Async.procedure(() -> System.out.println("test"));
5210+
}
5211+
5212+
throw new RuntimeException("exception in signal method");
5213+
}
5214+
}
5215+
5216+
@Test
5217+
public void testExceptionInSignal() throws InterruptedException {
5218+
startWorkerFor(TestSignalExceptionWorkflowImpl.class);
5219+
TestWorkflowSignaled signalWorkflow =
5220+
workflowClient.newWorkflowStub(
5221+
TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build());
5222+
CompletableFuture<String> result = WorkflowClient.execute(signalWorkflow::execute);
5223+
signalWorkflow.signal1("test");
5224+
try {
5225+
result.get(1, TimeUnit.SECONDS);
5226+
fail("not reachable");
5227+
} catch (Exception e) {
5228+
// exception expected here.
5229+
}
5230+
5231+
// Suspend polling so that decision tasks are not retried. Otherwise it will affect our thread
5232+
// count.
5233+
if (useExternalService) {
5234+
workerFactory.suspendPolling();
5235+
} else {
5236+
testEnvironment.getWorkerFactory().suspendPolling();
5237+
}
5238+
5239+
// Wait for decision task retry to finish.
5240+
Thread.sleep(10000);
5241+
5242+
int workflowThreads = 0;
5243+
ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(false, false);
5244+
for (ThreadInfo thread : threads) {
5245+
if (thread.getThreadName().startsWith("workflow")) {
5246+
workflowThreads++;
5247+
}
5248+
}
5249+
5250+
assertTrue(
5251+
"workflow threads might leak, #workflowThreads = " + workflowThreads, workflowThreads < 20);
5252+
}
5253+
51975254
private static class TracingWorkflowInterceptor implements WorkflowInterceptor {
51985255

51995256
private final FilteredTrace trace;

0 commit comments

Comments
 (0)