Skip to content

Commit 74ed2cf

Browse files
authored
Merge pull request #4 from cicoyle/try-catch-grpc-worker
try-catch
2 parents fb9c30d + f7ddbba commit 74ed2cf

File tree

1 file changed

+27
-3
lines changed

1 file changed

+27
-3
lines changed

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3737
private final ExecutorService workerPool;
3838

3939
private final TaskHubSidecarServiceBlockingStub sidecarClient;
40+
private volatile boolean isNormalShutdown = false;
4041

4142
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4243
this.orchestrationFactories.putAll(builder.orchestrationFactories);
@@ -86,6 +87,7 @@ public void start() {
8687
* configured.
8788
*/
8889
public void close() {
90+
this.isNormalShutdown = true;
8991
this.shutDownWorkerPool();
9092
this.closeSideCarChannel();
9193
}
@@ -136,7 +138,17 @@ public void startAndBlock() {
136138
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
137139
.build();
138140

139-
this.sidecarClient.completeOrchestratorTask(response);
141+
try {
142+
this.sidecarClient.completeOrchestratorTask(response);
143+
} catch (StatusRuntimeException e) {
144+
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
145+
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress());
146+
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
147+
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress());
148+
} else {
149+
logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress());
150+
}
151+
}
140152
});
141153
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
142154
ActivityRequest activityRequest = workItem.getActivityRequest();
@@ -169,7 +181,17 @@ public void startAndBlock() {
169181
responseBuilder.setFailureDetails(failureDetails);
170182
}
171183

172-
this.sidecarClient.completeActivityTask(responseBuilder.build());
184+
try {
185+
this.sidecarClient.completeActivityTask(responseBuilder.build());
186+
} catch (StatusRuntimeException e) {
187+
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
188+
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress());
189+
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
190+
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress());
191+
} else {
192+
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress());
193+
}
194+
}
173195
});
174196
} else {
175197
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
@@ -214,7 +236,9 @@ private void closeSideCarChannel() {
214236
}
215237

216238
private void shutDownWorkerPool() {
217-
logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted");
239+
if (!this.isNormalShutdown) {
240+
logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
241+
}
218242

219243
this.workerPool.shutdown();
220244
try {

0 commit comments

Comments
 (0)