diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 00000000..c2537568 --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1,3 @@ +# Enable auto-env through the sdkman_auto_env config +# Add key=value pairs of SDKs to use below +java=11.0.27-tem diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 616fac5a..32ec000a 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -20,7 +20,8 @@ import java.util.logging.Logger; /** - * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events. + * Task hub worker that connects to a sidecar process over gRPC to execute + * orchestrator and activity events. */ public final class DurableTaskGrpcWorker implements AutoCloseable { @@ -39,6 +40,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; + private Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); @@ -66,43 +68,58 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); - this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; + this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval + : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); this.isExecutorServiceManaged = builder.executorService == null; } /** - * Establishes a gRPC connection to the sidecar and starts processing work-items in the background. + * Establishes a gRPC connection to the sidecar and starts processing work-items + * in the background. *
- * This method retries continuously to establish a connection to the sidecar. If a connection fails, - * a warning log message will be written and a new connection attempt will be made. This process - * continues until either a connection succeeds or the process receives an interrupt signal. + * This method retries continuously to establish a connection to the sidecar. If + * a connection fails, + * a warning log message will be written and a new connection attempt will be + * made. This process + * continues until either a connection succeeds or the process receives an + * interrupt signal. */ public void start() { - new Thread(this::startAndBlock).start(); + this.workerThread = new Thread(this::startAndBlock); + this.workerThread.start(); } /** - * Closes the internally managed gRPC channel and executor service, if one exists. + * Closes the internally managed gRPC channel and executor service, if one + * exists. *
- * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied, + * Only the internally managed GRPC Channel and Executor services are closed. If + * any of them are supplied, * it is the responsibility of the supplier to take care of them. */ public void close() { + this.workerThread.interrupt(); this.isNormalShutdown = true; this.shutDownWorkerPool(); this.closeSideCarChannel(); } /** - * Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread. - * This method call blocks indefinitely, or until the current thread is interrupted. + * Establishes a gRPC connection to the sidecar and starts processing work-items + * on the current thread. + * This method call blocks indefinitely, or until the current thread is + * interrupted. *
- * Use can alternatively use the {@link #start} method to run orchestration processing in a background thread. + * Use can alternatively use the {@link #start} method to run orchestration + * processing in a background thread. *
- * This method retries continuously to establish a connection to the sidecar. If a connection fails, - * a warning log message will be written and a new connection attempt will be made. This process - * continues until either a connection succeeds or the process receives an interrupt signal. + * This method retries continuously to establish a connection to the sidecar. If + * a connection fails, + * a warning log message will be written and a new connection attempt will be + * made. This process + * continues until either a connection succeeds or the process receives an + * interrupt signal. */ public void startAndBlock() { logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress()); @@ -117,7 +134,6 @@ public void startAndBlock() { this.dataConverter, logger); - // TODO: How do we interrupt manually? while (true) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); @@ -145,11 +161,17 @@ public void startAndBlock() { this.sidecarClient.completeOrchestratorTask(response); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress()); + logger.log(Level.WARNING, + "The sidecar at address {0} is unavailable while completing the orchestrator task.", + this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress()); + logger.log(Level.WARNING, + "Durable Task worker has disconnected from {0} while completing the orchestrator task.", + this.getSidecarAddress()); } else { - logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress()); + logger.log(Level.WARNING, + "Unexpected failure completing the orchestrator task at {0}.", + this.getSidecarAddress()); } } }); @@ -189,29 +211,35 @@ public void startAndBlock() { this.sidecarClient.completeActivityTask(responseBuilder.build()); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress()); + logger.log(Level.WARNING, + "The sidecar at address {0} is unavailable while completing the activity task.", + this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress()); + logger.log(Level.WARNING, + "Durable Task worker has disconnected from {0} while completing the activity task.", + this.getSidecarAddress()); } else { - logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress()); + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", + this.getSidecarAddress()); } } }); - } - else if (requestType == RequestCase.HEALTHPING) - { + } else if (requestType == RequestCase.HEALTHPING) { // No-op } else { - logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); + logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", + requestType); } } } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress()); + logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", + this.getSidecarAddress()); } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress()); } else { - logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); + logger.log(Level.WARNING, + String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e); } // Retry after 5 seconds @@ -225,7 +253,8 @@ else if (requestType == RequestCase.HEALTHPING) } /** - * Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed. + * Stops the current worker's listen loop, preventing any new orchestrator or + * activity events from being processed. */ public void stop() { this.close(); @@ -246,7 +275,8 @@ private void closeSideCarChannel() { private void shutDownWorkerPool() { if (this.isExecutorServiceManaged) { if (!this.isNormalShutdown) { - logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); + logger.log(Level.WARNING, + "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); } this.workerPool.shutdown();