Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -21,6 +23,7 @@
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
*/
public final class DurableTaskGrpcWorker implements AutoCloseable {

private static final int DEFAULT_PORT = 4001;
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
Expand All @@ -31,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final ManagedChannel managedSidecarChannel;
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final ExecutorService workerPool;

private final TaskHubSidecarServiceBlockingStub sidecarClient;

Expand Down Expand Up @@ -61,6 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
}

/**
Expand All @@ -81,19 +86,8 @@ public void start() {
* configured.
*/
public void close() {
if (this.managedSidecarChannel != null) {
try {
this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Best effort. Also note that AutoClose documentation recommends NOT having
// close() methods throw InterruptedException:
// https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html
}
}
}

private String getSidecarAddress() {
return this.sidecarClient.getChannel().authority();
this.shutDownWorkerPool();
this.closeSideCarChannel();
}

/**
Expand All @@ -120,7 +114,7 @@ public void startAndBlock() {
logger);

// TODO: How do we interrupt manually?
while (true) {
while (!this.workerPool.isShutdown()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what the side effects of this change might be. For sure we'll need to update the logging code to account for this new exit condition. But does this mean that user code could also accidentally shut down this loop by shutting down the worker pool? Is this change critical?

Copy link
Author

@siri-varma siri-varma May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the workerpool is closed they cannot submit new activities anyway so I thought we could safely come out of the while loop.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, we should at least log a warning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this resolved? @siri-varma

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Took care of it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
Expand All @@ -130,51 +124,53 @@ public void startAndBlock() {
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();

this.sidecarClient.completeOrchestratorTask(response);
this.workerPool.submit(() -> {
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();

this.sidecarClient.completeOrchestratorTask(response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a try catch here as well as it can throw StatusRuntimeException

catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure connecting to {0}.", this.getSidecarAddress());
}

});
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
ActivityRequest activityRequest = workItem.getActivityRequest();

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId());

if (output != null) {
responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
}

this.sidecarClient.completeActivityTask(responseBuilder.build());
this.workerPool.submit(() -> {
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId());

if (output != null) {
responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
}

this.sidecarClient.completeActivityTask(responseBuilder.build());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try catch block would be required here as well, otherwise the exception will never be logged and could be difficult to debbug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I will do a quick small PR to address this

});
} else {
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
}
Expand All @@ -183,7 +179,7 @@ public void startAndBlock() {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure connecting to {0}.", this.getSidecarAddress());
}
Expand All @@ -204,4 +200,31 @@ public void startAndBlock() {
public void stop() {
this.close();
}

private void closeSideCarChannel() {
if (this.managedSidecarChannel != null) {
try {
this.managedSidecarChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Best effort. Also note that AutoClose documentation recommends NOT having
// close() methods throw InterruptedException:
// https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html
}
}
}

private void shutDownWorkerPool() {
this.workerPool.shutdown();
try {
if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) {
this.workerPool.shutdownNow();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}

private String getSidecarAddress() {
return this.sidecarClient.getChannel().authority();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
Expand All @@ -17,6 +20,7 @@ public final class DurableTaskGrpcWorkerBuilder {
Channel channel;
DataConverter dataConverter;
Duration maximumTimerInterval;
ExecutorService executorService;

/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
Expand Down Expand Up @@ -113,6 +117,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn
return this;
}

/**
* Sets the executor service that will be used to execute threads.
*
* @param executorService {@link ExecutorService}.
* @return this builder object.
*/
public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskGrpcWorker} object
Expand Down