From 43e11e56f1a725aa99afceca97052d092250081f Mon Sep 17 00:00:00 2001 From: siri-varma Date: Tue, 6 May 2025 23:01:12 -0700 Subject: [PATCH 1/5] Add worker thread Signed-off-by: siri-varma --- .../durabletask/DurableTaskGrpcWorker.java | 91 ++++++++++--------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 92e2bda4..082ab267 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -13,6 +13,8 @@ import java.time.Duration; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,6 +23,7 @@ * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events. */ public final class DurableTaskGrpcWorker implements AutoCloseable { + private static final int DEFAULT_PORT = 4001; private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); @@ -31,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ManagedChannel managedSidecarChannel; private final DataConverter dataConverter; private final Duration maximumTimerInterval; + private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -61,6 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; + this.workerPool = Executors.newCachedThreadPool(); } /** @@ -130,51 +135,53 @@ public void startAndBlock() { if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); - - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .addAllActions(taskOrchestratorResult.getActions()) - .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) - .build(); - - this.sidecarClient.completeOrchestratorTask(response); + this.workerPool.submit(() -> { + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .addAllActions(taskOrchestratorResult.getActions()) + .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) + .build(); + + this.sidecarClient.completeOrchestratorTask(response); + }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); - // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava - String output = null; - TaskFailureDetails failureDetails = null; - try { - output = taskActivityExecutor.execute( - activityRequest.getName(), - activityRequest.getInput().getValue(), - activityRequest.getTaskId()); - } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); - } - - ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) - .setTaskId(activityRequest.getTaskId()); - - if (output != null) { - responseBuilder.setResult(StringValue.of(output)); - } - - if (failureDetails != null) { - responseBuilder.setFailureDetails(failureDetails); - } - - this.sidecarClient.completeActivityTask(responseBuilder.build()); + this.workerPool.submit(() -> { + String output = null; + TaskFailureDetails failureDetails = null; + try { + output = taskActivityExecutor.execute( + activityRequest.getName(), + activityRequest.getInput().getValue(), + activityRequest.getTaskId()); + } catch (Throwable e) { + failureDetails = TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build(); + } + + ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setTaskId(activityRequest.getTaskId()); + + if (output != null) { + responseBuilder.setResult(StringValue.of(output)); + } + + if (failureDetails != null) { + responseBuilder.setFailureDetails(failureDetails); + } + + this.sidecarClient.completeActivityTask(responseBuilder.build()); + }); } else { logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); } @@ -183,7 +190,7 @@ public void startAndBlock() { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); + logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); } else { logger.log(Level.WARNING, "Unexpected failure connecting to {0}.", this.getSidecarAddress()); } From 9d4bae064ebf20b56513ecc3f44b96d5ced7cf50 Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 09:07:10 -0700 Subject: [PATCH 2/5] Add worker thread Signed-off-by: siri-varma --- .../durabletask/DurableTaskGrpcWorker.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 082ab267..c2fcc0d5 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -86,6 +86,11 @@ public void start() { * configured. */ public void close() { + this.closeSideCarChannel(); + this.shutDownWorkerPool(); + } + + private void closeSideCarChannel() { if (this.managedSidecarChannel != null) { try { this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); @@ -97,10 +102,6 @@ public void close() { } } - private String getSidecarAddress() { - return this.sidecarClient.getChannel().authority(); - } - /** * Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread. * This method call blocks indefinitely, or until the current thread is interrupted. @@ -125,7 +126,7 @@ public void startAndBlock() { logger); // TODO: How do we interrupt manually? - while (true) { + while (!this.workerPool.isShutdown()) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); @@ -211,4 +212,19 @@ public void startAndBlock() { public void stop() { this.close(); } + + private void shutDownWorkerPool() { + this.workerPool.shutdown(); + try { + if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { + this.workerPool.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + private String getSidecarAddress() { + return this.sidecarClient.getChannel().authority(); + } } \ No newline at end of file From 5b2db59e02e125d4118b5e8cad8602afe017962d Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 09:19:36 -0700 Subject: [PATCH 3/5] Add worker thread Signed-off-by: siri-varma --- .../durabletask/DurableTaskGrpcWorker.java | 28 +++++++++---------- .../DurableTaskGrpcWorkerBuilder.java | 15 ++++++++++ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index c2fcc0d5..fb1dfb0c 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -65,7 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = Executors.newCachedThreadPool(); + this.workerPool = builder.executorService; } /** @@ -86,20 +86,8 @@ public void start() { * configured. */ public void close() { - this.closeSideCarChannel(); this.shutDownWorkerPool(); - } - - private void closeSideCarChannel() { - if (this.managedSidecarChannel != null) { - try { - this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Best effort. Also note that AutoClose documentation recommends NOT having - // close() methods throw InterruptedException: - // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html - } - } + this.closeSideCarChannel(); } /** @@ -213,6 +201,18 @@ public void stop() { this.close(); } + private void closeSideCarChannel() { + if (this.managedSidecarChannel != null) { + try { + this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Best effort. Also note that AutoClose documentation recommends NOT having + // close() methods throw InterruptedException: + // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html + } + } + } + private void shutDownWorkerPool() { this.workerPool.shutdown(); try { diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index 3076a7b4..111071c3 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -6,6 +6,9 @@ import java.time.Duration; import java.util.HashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. @@ -17,6 +20,7 @@ public final class DurableTaskGrpcWorkerBuilder { Channel channel; DataConverter dataConverter; Duration maximumTimerInterval; + ExecutorService executorService; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -113,6 +117,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn return this; } + /** + * Sets the executor service that will be used to execute threads. + * + * @param executorService {@link ExecutorService}. + * @return this builder object. + */ + public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object From a05165071434b4708e92d370d8c4cca75a7ee9b5 Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 09:24:00 -0700 Subject: [PATCH 4/5] Add check Signed-off-by: siri-varma --- .../java/com/microsoft/durabletask/DurableTaskGrpcWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index fb1dfb0c..21e86cd4 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -65,7 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = builder.executorService; + this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); } /** From faf6ff75e14bd0cc32f0656830412c9f1c675f9b Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 10:45:51 -0700 Subject: [PATCH 5/5] Added warn Signed-off-by: siri-varma --- .../java/com/microsoft/durabletask/DurableTaskGrpcWorker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 21e86cd4..28a6e358 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -214,6 +214,8 @@ private void closeSideCarChannel() { } private void shutDownWorkerPool() { + logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted"); + this.workerPool.shutdown(); try { if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) {