Skip to content

Commit bf2124a

Browse files
authored
3. Thread based eviction (#203)
1 parent 60a7fdb commit bf2124a

File tree

8 files changed

+237
-24
lines changed

8 files changed

+237
-24
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunner.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.uber.cadence.internal.replay.DeciderCache;
2021
import com.uber.cadence.workflow.CancellationScope;
2122
import com.uber.cadence.workflow.Workflow;
2223
import java.util.concurrent.ExecutorService;
@@ -37,6 +38,25 @@ static DeterministicRunner newRunner(Supplier<Long> clock, Runnable root) {
3738
return new DeterministicRunnerImpl(clock, root);
3839
}
3940

41+
/**
42+
* Create new instance of DeterministicRunner
43+
*
44+
* @param decisionContext decision context to use
45+
* @param clock Supplier that returns current time that sync should use
46+
* @param root function that root thread of the runner executes.
47+
* @param cache DeciderCache used cache inflight workflows. New workflow threads will evict this
48+
* cache when the thread pool runs out
49+
* @return instance of the DeterministicRunner.
50+
*/
51+
static DeterministicRunner newRunner(
52+
ExecutorService threadPool,
53+
SyncDecisionContext decisionContext,
54+
Supplier<Long> clock,
55+
Runnable root,
56+
DeciderCache cache) {
57+
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, cache);
58+
}
59+
4060
/**
4161
* Create new instance of DeterministicRunner
4262
*
@@ -50,7 +70,7 @@ static DeterministicRunner newRunner(
5070
SyncDecisionContext decisionContext,
5171
Supplier<Long> clock,
5272
Runnable root) {
53-
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root);
73+
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, null);
5474
}
5575

5676
/**

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.converter.JsonDataConverter;
2424
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2525
import com.uber.cadence.internal.replay.ContinueAsNewWorkflowExecutionParameters;
26+
import com.uber.cadence.internal.replay.DeciderCache;
2627
import com.uber.cadence.internal.replay.DecisionContext;
2728
import com.uber.cadence.internal.replay.ExecuteActivityParameters;
2829
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
@@ -81,6 +82,7 @@ private NamedRunnable(String name, Runnable runnable) {
8182
private final List<WorkflowThread> threadsToAdd = Collections.synchronizedList(new ArrayList<>());
8283
private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
8384
private final Supplier<Long> clock;
85+
private DeciderCache cache;
8486
private boolean inRunUntilAllBlocked;
8587
private boolean closeRequested;
8688
private boolean closed;
@@ -118,7 +120,7 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
118120
}
119121

120122
DeterministicRunnerImpl(Supplier<Long> clock, Runnable root) {
121-
this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root);
123+
this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root, null);
122124
}
123125

124126
private static ThreadPoolExecutor getDefaultThreadPool() {
@@ -139,10 +141,20 @@ public Thread newThread(Runnable r) {
139141
SyncDecisionContext decisionContext,
140142
Supplier<Long> clock,
141143
Runnable root) {
144+
this(threadPool, decisionContext, clock, root, null);
145+
}
146+
147+
DeterministicRunnerImpl(
148+
ExecutorService threadPool,
149+
SyncDecisionContext decisionContext,
150+
Supplier<Long> clock,
151+
Runnable root,
152+
DeciderCache cache) {
142153
this.threadPool = threadPool;
143154
this.decisionContext =
144155
decisionContext != null ? decisionContext : newDummySyncDecisionContext();
145156
this.clock = clock;
157+
this.cache = cache;
146158
runnerCancellationScope = new CancellationScopeImpl(true, null, null);
147159
// TODO: workflow instance specific thread name
148160
rootWorkflowThread =
@@ -153,7 +165,8 @@ public Thread newThread(Runnable r) {
153165
WORKFLOW_ROOT_THREAD_NAME,
154166
false,
155167
runnerCancellationScope,
156-
root);
168+
root,
169+
cache);
157170
threads.addLast(rootWorkflowThread);
158171
rootWorkflowThread.start();
159172
}
@@ -183,7 +196,14 @@ public void runUntilAllBlocked() throws Throwable {
183196
for (NamedRunnable nr : toExecuteInWorkflowThread) {
184197
WorkflowThread thread =
185198
new WorkflowThreadImpl(
186-
false, threadPool, this, nr.name, false, runnerCancellationScope, nr.runnable);
199+
false,
200+
threadPool,
201+
this,
202+
nr.name,
203+
false,
204+
runnerCancellationScope,
205+
nr.runnable,
206+
cache);
187207
// It is important to prepend threads as there are callbacks
188208
// like signals that have to run before any other threads.
189209
// Otherwise signal might be never processed if it was received
@@ -363,7 +383,14 @@ WorkflowThread newThread(Runnable runnable, boolean detached, String name) {
363383
checkClosed();
364384
WorkflowThread result =
365385
new WorkflowThreadImpl(
366-
false, threadPool, this, name, detached, CancellationScopeImpl.current(), runnable);
386+
false,
387+
threadPool,
388+
this,
389+
name,
390+
detached,
391+
CancellationScopeImpl.current(),
392+
runnable,
393+
cache);
367394
threadsToAdd.add(result); // This is synchronized collection.
368395
return result;
369396
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2525
import com.uber.cadence.internal.common.InternalUtils;
2626
import com.uber.cadence.internal.metrics.MetricsType;
27+
import com.uber.cadence.internal.replay.DeciderCache;
2728
import com.uber.cadence.internal.replay.ReplayWorkflow;
2829
import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
2930
import com.uber.cadence.internal.worker.WorkflowExecutionException;
@@ -67,16 +68,19 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
6768

6869
private final ExecutorService threadPool;
6970
private final Scope metricsScope;
71+
private DeciderCache cache;
7072

7173
POJOWorkflowImplementationFactory(
7274
DataConverter dataConverter,
7375
ExecutorService threadPool,
7476
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
75-
Scope metricsScope) {
77+
Scope metricsScope,
78+
DeciderCache cache) {
7679
this.dataConverter = Objects.requireNonNull(dataConverter);
7780
this.threadPool = Objects.requireNonNull(threadPool);
7881
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
7982
this.metricsScope = metricsScope;
83+
this.cache = cache;
8084
}
8185

8286
void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {
@@ -183,7 +187,7 @@ public void setDataConverter(DataConverter dataConverter) {
183187
@Override
184188
public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
185189
SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType);
186-
return new SyncWorkflow(workflow, dataConverter, threadPool, interceptorFactory);
190+
return new SyncWorkflow(workflow, dataConverter, threadPool, interceptorFactory, cache);
187191
}
188192

189193
@Override

src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.WorkflowType;
2424
import com.uber.cadence.client.WorkflowClient;
2525
import com.uber.cadence.converter.DataConverter;
26+
import com.uber.cadence.internal.replay.DeciderCache;
2627
import com.uber.cadence.internal.replay.DecisionContext;
2728
import com.uber.cadence.internal.replay.ReplayWorkflow;
2829
import com.uber.cadence.internal.worker.WorkflowExecutionException;
@@ -41,18 +42,21 @@ class SyncWorkflow implements ReplayWorkflow {
4142
private final ExecutorService threadPool;
4243
private final SyncWorkflowDefinition workflow;
4344
private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
45+
private DeciderCache cache;
4446
private WorkflowRunnable workflowProc;
4547
private DeterministicRunner runner;
4648

4749
public SyncWorkflow(
4850
SyncWorkflowDefinition workflow,
4951
DataConverter dataConverter,
5052
ExecutorService threadPool,
51-
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory) {
53+
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
54+
DeciderCache cache) {
5255
this.workflow = Objects.requireNonNull(workflow);
5356
this.dataConverter = Objects.requireNonNull(dataConverter);
5457
this.threadPool = Objects.requireNonNull(threadPool);
5558
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
59+
this.cache = cache;
5660
}
5761

5862
@Override
@@ -74,7 +78,7 @@ public void start(HistoryEvent event, DecisionContext context) {
7478
syncContext, workflow, event.getWorkflowExecutionStartedEventAttributes());
7579
runner =
7680
DeterministicRunner.newRunner(
77-
threadPool, syncContext, context::currentTimeMillis, workflowProc);
81+
threadPool, syncContext, context::currentTimeMillis, workflowProc, cache);
7882
syncContext.setRunner(runner);
7983
}
8084

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public SyncWorkflowWorker(
6060
options.getDataConverter(),
6161
workflowThreadPool,
6262
interceptorFactory,
63-
options.getMetricsScope());
63+
options.getMetricsScope(),
64+
cache);
65+
6466
DecisionTaskHandler taskHandler =
6567
new ReplayDecisionTaskHandler(
6668
domain,

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import com.uber.cadence.internal.logging.LoggerTag;
21+
import com.uber.cadence.internal.replay.DeciderCache;
2122
import com.uber.cadence.internal.replay.DecisionContext;
2223
import com.uber.cadence.workflow.Promise;
2324
import java.io.PrintWriter;
@@ -148,6 +149,7 @@ public void setName(String name) {
148149
private final boolean root;
149150
private final ExecutorService threadPool;
150151
private final WorkflowThreadContext context;
152+
private DeciderCache cache;
151153
private final DeterministicRunnerImpl runner;
152154
private final RunnableWrapper task;
153155
private Thread thread;
@@ -169,12 +171,14 @@ public void setName(String name) {
169171
String name,
170172
boolean detached,
171173
CancellationScopeImpl parentCancellationScope,
172-
Runnable runnable) {
174+
Runnable runnable,
175+
DeciderCache cache) {
173176
this.root = root;
174177
this.threadPool = threadPool;
175178
this.runner = runner;
176179
this.context = new WorkflowThreadContext(runner.getLock());
177-
// TODO: Use thread pool instead of creating new threads.
180+
this.cache = cache;
181+
178182
if (name == null) {
179183
name = "workflow-" + super.hashCode();
180184
}
@@ -224,6 +228,14 @@ public void start() {
224228
throw new IllegalThreadStateException("already started");
225229
}
226230
context.setStatus(Status.RUNNING);
231+
232+
try {
233+
taskFuture = threadPool.submit(task);
234+
return;
235+
} catch (RejectedExecutionException e) {
236+
cache.evictNext();
237+
}
238+
227239
try {
228240
taskFuture = threadPool.submit(task);
229241
} catch (RejectedExecutionException e) {

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ public static final class Factory {
342342
private final String domain;
343343
private final UUID id =
344344
UUID.randomUUID(); // Guarantee uniqueness for stickyTaskListName when multiple factories
345-
// are created.
346345
private final ThreadPoolExecutor workflowThreadPool;
347346
private final AtomicInteger workflowThreadCounter = new AtomicInteger();
348347
private final FactoryOptions factoryOptions;

0 commit comments

Comments
 (0)