diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 28a6e358..3650bcbf 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private volatile boolean isNormalShutdown = false; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); @@ -86,6 +87,7 @@ public void start() { * configured. */ public void close() { + this.isNormalShutdown = true; this.shutDownWorkerPool(); this.closeSideCarChannel(); } @@ -136,7 +138,17 @@ public void startAndBlock() { .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) .build(); - this.sidecarClient.completeOrchestratorTask(response); + try { + this.sidecarClient.completeOrchestratorTask(response); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress()); + } + } }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); @@ -169,7 +181,17 @@ public void startAndBlock() { responseBuilder.setFailureDetails(failureDetails); } - this.sidecarClient.completeActivityTask(responseBuilder.build()); + try { + this.sidecarClient.completeActivityTask(responseBuilder.build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress()); + } + } }); } else { logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); @@ -214,7 +236,9 @@ private void closeSideCarChannel() { } private void shutDownWorkerPool() { - logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted"); + if (!this.isNormalShutdown) { + logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); + } this.workerPool.shutdown(); try {