diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java b/src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java index ef275007b..591f47bb7 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java @@ -31,10 +31,11 @@ public class SyncActivityWorker implements SuspendableWorker { private final ActivityWorker worker; private final POJOActivityTaskHandler taskHandler; - private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4); + private final ScheduledExecutorService heartbeatExecutor; public SyncActivityWorker( IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) { + heartbeatExecutor = options.getExecutorWrapper().wrap(Executors.newScheduledThreadPool(4)); taskHandler = new POJOActivityTaskHandler(service, domain, options.getDataConverter(), heartbeatExecutor); worker = new ActivityWorker(service, domain, taskList, options, taskHandler); diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java index df5fe6de7..07cba4b0e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java @@ -32,6 +32,7 @@ import com.uber.cadence.internal.worker.SuspendableWorker; import com.uber.cadence.internal.worker.WorkflowWorker; import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.worker.ExecutorWrapper; import com.uber.cadence.worker.WorkflowImplementationOptions; import com.uber.cadence.workflow.Functions.Func; import com.uber.cadence.workflow.WorkflowInterceptor; @@ -51,8 +52,8 @@ public class SyncWorkflowWorker private final POJOWorkflowImplementationFactory factory; private final DataConverter dataConverter; private final POJOActivityTaskHandler laTaskHandler; - private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4); - private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4); + private final ScheduledExecutorService heartbeatExecutor; + private final ScheduledExecutorService ldaHeartbeatExecutor; private SuspendableWorker ldaWorker; private POJOActivityTaskHandler ldaTaskHandler; private final IWorkflowService service; @@ -73,6 +74,11 @@ public SyncWorkflowWorker( this.dataConverter = workflowOptions.getDataConverter(); this.service = service; + // heartbeat executors + ExecutorWrapper executorWrapper = localActivityOptions.getExecutorWrapper(); + heartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4)); + ldaHeartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4)); + factory = new POJOWorkflowImplementationFactory( workflowOptions.getDataConverter(), diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index d2b73eb4b..9e83156d5 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -100,7 +100,8 @@ public void start() { getOrCreateActivityPollTask(), new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)), options.getPollerOptions(), - options.getMetricsScope()); + options.getMetricsScope(), + options.getExecutorWrapper()); poller.start(); setPoller(poller); options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 155e6edea..01ffde191 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -83,7 +83,8 @@ public void start() { laPollTask, new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)), options.getPollerOptions(), - options.getMetricsScope()); + options.getMetricsScope(), + options.getExecutorWrapper()); poller.start(); setPoller(poller); options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java b/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java index db8f4104c..3370f7ebd 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java @@ -48,12 +48,15 @@ public interface TaskHandler { this.options = options; taskExecutor = - new ThreadPoolExecutor( - 0, - options.getTaskExecutorThreadPoolSize(), - 1, - TimeUnit.SECONDS, - new SynchronousQueue<>()); + options + .getExecutorWrapper() + .wrap( + new ThreadPoolExecutor( + 0, + options.getTaskExecutorThreadPoolSize(), + 1, + TimeUnit.SECONDS, + new SynchronousQueue<>())); taskExecutor.setThreadFactory( new ExecutorThreadFactory( options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"), diff --git a/src/main/java/com/uber/cadence/internal/worker/Poller.java b/src/main/java/com/uber/cadence/internal/worker/Poller.java index 3803360d4..5a81abcb5 100644 --- a/src/main/java/com/uber/cadence/internal/worker/Poller.java +++ b/src/main/java/com/uber/cadence/internal/worker/Poller.java @@ -22,6 +22,7 @@ import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.worker.autoscaler.AutoScaler; import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory; +import com.uber.cadence.worker.ExecutorWrapper; import com.uber.m3.tally.Scope; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; @@ -72,17 +73,21 @@ interface ThrowingRunnable { private final AutoScaler pollerAutoScaler; + private final ExecutorWrapper executorWrapper; + public Poller( String identity, PollTask pollTask, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - Scope metricsScope) { + Scope metricsScope, + ExecutorWrapper executorWrapper) { Objects.requireNonNull(identity, "identity cannot be null"); Objects.requireNonNull(pollTask, "poll service should not be null"); Objects.requireNonNull(taskExecutor, "taskExecutor should not be null"); Objects.requireNonNull(pollerOptions, "pollerOptions should not be null"); Objects.requireNonNull(metricsScope, "metricsScope should not be null"); + Objects.requireNonNull(metricsScope, "executorWrapper should not be null"); this.identity = identity; this.pollTask = pollTask; @@ -90,6 +95,7 @@ public Poller( this.pollerOptions = pollerOptions; this.metricsScope = metricsScope; this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions); + this.executorWrapper = executorWrapper; } @Override @@ -109,12 +115,13 @@ public void start() { // As task enqueues next task the buffering is needed to queue task until the previous one // releases a thread. pollExecutor = - new ThreadPoolExecutor( - pollerOptions.getPollThreadCount(), - pollerOptions.getPollThreadCount(), - 1, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount())); + executorWrapper.wrap( + new ThreadPoolExecutor( + pollerOptions.getPollThreadCount(), + pollerOptions.getPollThreadCount(), + 1, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()))); pollExecutor.setThreadFactory( new ExecutorThreadFactory( pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler())); diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index 716a9eed7..c7026ff6e 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -21,6 +21,7 @@ import com.uber.cadence.converter.DataConverter; import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.metrics.NoopScope; +import com.uber.cadence.worker.ExecutorWrapper; import com.uber.m3.tally.Scope; import io.opentracing.Tracer; import java.time.Duration; @@ -46,6 +47,7 @@ public static final class Builder { private boolean enableLoggingInReplay; private List contextPropagators; private Tracer tracer; + private ExecutorWrapper executorWrapper; private Builder() {} @@ -59,6 +61,7 @@ public Builder(SingleWorkerOptions options) { this.enableLoggingInReplay = options.getEnableLoggingInReplay(); this.contextPropagators = options.getContextPropagators(); this.tracer = options.getTracer(); + this.executorWrapper = options.getExecutorWrapper(); } public Builder setIdentity(String identity) { @@ -107,6 +110,11 @@ public Builder setTracer(Tracer tracer) { return this; } + public Builder setExecutorWrapper(ExecutorWrapper executorWrapper) { + this.executorWrapper = executorWrapper; + return this; + } + public SingleWorkerOptions build() { if (pollerOptions == null) { pollerOptions = @@ -134,7 +142,8 @@ public SingleWorkerOptions build() { metricsScope, enableLoggingInReplay, contextPropagators, - tracer); + tracer, + executorWrapper); } } @@ -147,6 +156,7 @@ public SingleWorkerOptions build() { private final boolean enableLoggingInReplay; private List contextPropagators; private final Tracer tracer; + private final ExecutorWrapper executorWrapper; private SingleWorkerOptions( String identity, @@ -157,7 +167,8 @@ private SingleWorkerOptions( Scope metricsScope, boolean enableLoggingInReplay, List contextPropagators, - Tracer tracer) { + Tracer tracer, + ExecutorWrapper executorWrapper) { this.identity = identity; this.dataConverter = dataConverter; this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; @@ -167,6 +178,7 @@ private SingleWorkerOptions( this.enableLoggingInReplay = enableLoggingInReplay; this.contextPropagators = contextPropagators; this.tracer = tracer; + this.executorWrapper = executorWrapper; } public String getIdentity() { @@ -204,4 +216,8 @@ public List getContextPropagators() { public Tracer getTracer() { return tracer; } + + public ExecutorWrapper getExecutorWrapper() { + return executorWrapper; + } } diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index f9b537d3b..e77a9491a 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -112,7 +112,8 @@ public void start() { options.getIdentity()), pollTaskExecutor, options.getPollerOptions(), - options.getMetricsScope()); + options.getMetricsScope(), + options.getExecutorWrapper()); poller.start(); setPoller(poller); options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/src/main/java/com/uber/cadence/worker/ExecutorWrapper.java b/src/main/java/com/uber/cadence/worker/ExecutorWrapper.java new file mode 100644 index 000000000..6893cb195 --- /dev/null +++ b/src/main/java/com/uber/cadence/worker/ExecutorWrapper.java @@ -0,0 +1,48 @@ +/* + * Modifications Copyright (c) 2017-2021 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.uber.cadence.worker; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +public interface ExecutorWrapper { + ExecutorService wrap(ExecutorService delegate); + + ThreadPoolExecutor wrap(ThreadPoolExecutor delegate); + + ScheduledExecutorService wrap(ScheduledExecutorService delegate); + + static ExecutorWrapper newDefaultInstance() { + return new ExecutorWrapper() { + @Override + public ExecutorService wrap(ExecutorService delegate) { + return delegate; + } + + @Override + public ThreadPoolExecutor wrap(ThreadPoolExecutor delegate) { + return delegate; + } + + @Override + public ScheduledExecutorService wrap(ScheduledExecutorService delegate) { + return delegate; + } + }; + } +} diff --git a/src/main/java/com/uber/cadence/worker/ShadowingWorker.java b/src/main/java/com/uber/cadence/worker/ShadowingWorker.java index d7305d17f..892128d70 100644 --- a/src/main/java/com/uber/cadence/worker/ShadowingWorker.java +++ b/src/main/java/com/uber/cadence/worker/ShadowingWorker.java @@ -113,6 +113,7 @@ public ShadowingWorker( .setTaskListActivitiesPerSecond(options.getTaskListActivitiesPerSecond()) .setPollerOptions(options.getActivityPollerOptions()) .setMetricsScope(metricsScope) + .setExecutorWrapper(testOptions.getWorkerFactoryOptions().getExecutorWrapper()) .build(); activityWorker = new SyncActivityWorker( diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 6ce2fa56e..9d4e2d626 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -102,6 +102,7 @@ public final class Worker implements Suspendable { .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) .setContextPropagators(contextPropagators) .setTracer(options.getTracer()) + .setExecutorWrapper(factoryOptions.getExecutorWrapper()) .build(); activityWorker = new SyncActivityWorker( @@ -117,6 +118,7 @@ public final class Worker implements Suspendable { .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) .setContextPropagators(contextPropagators) .setTracer(options.getTracer()) + .setExecutorWrapper(factoryOptions.getExecutorWrapper()) .build(); SingleWorkerOptions localActivityOptions = SingleWorkerOptions.newBuilder() @@ -128,6 +130,7 @@ public final class Worker implements Suspendable { .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) .setContextPropagators(contextPropagators) .setTracer(options.getTracer()) + .setExecutorWrapper(factoryOptions.getExecutorWrapper()) .build(); workflowWorker = new SyncWorkflowWorker( diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index 06113e067..2b3189ecb 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -94,12 +94,15 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory MoreObjects.firstNonNull(factoryOptions, WorkerFactoryOptions.defaultInstance()); workflowThreadPool = - new ThreadPoolExecutor( - 0, - this.factoryOptions.getMaxWorkflowThreadCount(), - 1, - TimeUnit.SECONDS, - new SynchronousQueue<>()); + factoryOptions + .getExecutorWrapper() + .wrap( + new ThreadPoolExecutor( + 0, + this.factoryOptions.getMaxWorkflowThreadCount(), + 1, + TimeUnit.SECONDS, + new SynchronousQueue<>())); workflowThreadPool.setThreadFactory( r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet())); @@ -136,7 +139,8 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory .setPollThreadNamePrefix(POLL_THREAD_NAME) .setPollThreadCount(this.factoryOptions.getStickyPollerCount()) .build(), - stickyScope); + stickyScope, + factoryOptions.getExecutorWrapper()); } /** diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java index 6d197a0c2..a1bb5dd6e 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java @@ -48,6 +48,7 @@ public static class Builder { private int maxWorkflowThreadCount = DEFAULT_MAX_WORKFLOW_THREAD_COUNT; private boolean enableLoggingInReplay; private int stickyPollerCount = DEFAULT_STICKY_POLLER_COUNT; + private ExecutorWrapper executorWrapper = ExecutorWrapper.newDefaultInstance(); private Builder() {} @@ -106,6 +107,11 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + public Builder setExecutorWrapper(ExecutorWrapper executorWrapper) { + this.executorWrapper = executorWrapper; + return this; + } + public WorkerFactoryOptions build() { return new WorkerFactoryOptions( disableStickyExecution, @@ -113,7 +119,8 @@ public WorkerFactoryOptions build() { maxWorkflowThreadCount, stickyTaskScheduleToStartTimeout, stickyPollerCount, - enableLoggingInReplay); + enableLoggingInReplay, + executorWrapper); } } @@ -123,6 +130,7 @@ public WorkerFactoryOptions build() { private Duration stickyTaskScheduleToStartTimeout; private boolean enableLoggingInReplay; private int stickyPollerCount; + private ExecutorWrapper executorWrapper; private WorkerFactoryOptions( boolean disableStickyExecution, @@ -130,7 +138,8 @@ private WorkerFactoryOptions( int maxWorkflowThreadCount, Duration stickyTaskScheduleToStartTimeout, int stickyPollerCount, - boolean enableLoggingInReplay) { + boolean enableLoggingInReplay, + ExecutorWrapper executorWrapper) { Preconditions.checkArgument(cacheMaximumSize > 0, "cacheMaximumSize should be greater than 0"); Preconditions.checkArgument( maxWorkflowThreadCount > 0, "maxWorkflowThreadCount should be greater than 0"); @@ -141,6 +150,7 @@ private WorkerFactoryOptions( this.stickyPollerCount = stickyPollerCount; this.enableLoggingInReplay = enableLoggingInReplay; this.stickyTaskScheduleToStartTimeout = stickyTaskScheduleToStartTimeout; + this.executorWrapper = executorWrapper; } public int getMaxWorkflowThreadCount() { @@ -166,4 +176,8 @@ public int getStickyPollerCount() { public Duration getStickyTaskScheduleToStartTimeout() { return stickyTaskScheduleToStartTimeout; } + + public ExecutorWrapper getExecutorWrapper() { + return executorWrapper; + } }