diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 92e2bda4..28a6e358 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -13,6 +13,8 @@ import java.time.Duration; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,6 +23,7 @@ * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events. */ public final class DurableTaskGrpcWorker implements AutoCloseable { + private static final int DEFAULT_PORT = 4001; private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); @@ -31,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ManagedChannel managedSidecarChannel; private final DataConverter dataConverter; private final Duration maximumTimerInterval; + private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -61,6 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; + this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); } /** @@ -81,19 +86,8 @@ public void start() { * configured. */ public void close() { - if (this.managedSidecarChannel != null) { - try { - this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Best effort. Also note that AutoClose documentation recommends NOT having - // close() methods throw InterruptedException: - // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html - } - } - } - - private String getSidecarAddress() { - return this.sidecarClient.getChannel().authority(); + this.shutDownWorkerPool(); + this.closeSideCarChannel(); } /** @@ -120,7 +114,7 @@ public void startAndBlock() { logger); // TODO: How do we interrupt manually? - while (true) { + while (!this.workerPool.isShutdown()) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); @@ -130,51 +124,53 @@ public void startAndBlock() { if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); - - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .addAllActions(taskOrchestratorResult.getActions()) - .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) - .build(); - - this.sidecarClient.completeOrchestratorTask(response); + this.workerPool.submit(() -> { + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .addAllActions(taskOrchestratorResult.getActions()) + .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) + .build(); + + this.sidecarClient.completeOrchestratorTask(response); + }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); - // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava - String output = null; - TaskFailureDetails failureDetails = null; - try { - output = taskActivityExecutor.execute( - activityRequest.getName(), - activityRequest.getInput().getValue(), - activityRequest.getTaskId()); - } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); - } - - ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) - .setTaskId(activityRequest.getTaskId()); - - if (output != null) { - responseBuilder.setResult(StringValue.of(output)); - } - - if (failureDetails != null) { - responseBuilder.setFailureDetails(failureDetails); - } - - this.sidecarClient.completeActivityTask(responseBuilder.build()); + this.workerPool.submit(() -> { + String output = null; + TaskFailureDetails failureDetails = null; + try { + output = taskActivityExecutor.execute( + activityRequest.getName(), + activityRequest.getInput().getValue(), + activityRequest.getTaskId()); + } catch (Throwable e) { + failureDetails = TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build(); + } + + ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setTaskId(activityRequest.getTaskId()); + + if (output != null) { + responseBuilder.setResult(StringValue.of(output)); + } + + if (failureDetails != null) { + responseBuilder.setFailureDetails(failureDetails); + } + + this.sidecarClient.completeActivityTask(responseBuilder.build()); + }); } else { logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); } @@ -183,7 +179,7 @@ public void startAndBlock() { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); + logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); } else { logger.log(Level.WARNING, "Unexpected failure connecting to {0}.", this.getSidecarAddress()); } @@ -204,4 +200,33 @@ public void startAndBlock() { public void stop() { this.close(); } + + private void closeSideCarChannel() { + if (this.managedSidecarChannel != null) { + try { + this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Best effort. Also note that AutoClose documentation recommends NOT having + // close() methods throw InterruptedException: + // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html + } + } + } + + private void shutDownWorkerPool() { + logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted"); + + this.workerPool.shutdown(); + try { + if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { + this.workerPool.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + private String getSidecarAddress() { + return this.sidecarClient.getChannel().authority(); + } } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index 3076a7b4..111071c3 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -6,6 +6,9 @@ import java.time.Duration; import java.util.HashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. @@ -17,6 +20,7 @@ public final class DurableTaskGrpcWorkerBuilder { Channel channel; DataConverter dataConverter; Duration maximumTimerInterval; + ExecutorService executorService; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -113,6 +117,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn return this; } + /** + * Sets the executor service that will be used to execute threads. + * + * @param executorService {@link ExecutorService}. + * @return this builder object. + */ + public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object