1313
1414import java .time .Duration ;
1515import java .util .*;
16+ import java .util .concurrent .ExecutorService ;
17+ import java .util .concurrent .Executors ;
1618import java .util .concurrent .TimeUnit ;
1719import java .util .logging .Level ;
1820import java .util .logging .Logger ;
2123 * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
2224 */
2325public final class DurableTaskGrpcWorker implements AutoCloseable {
26+
2427 private static final int DEFAULT_PORT = 4001 ;
2528 private static final Logger logger = Logger .getLogger (DurableTaskGrpcWorker .class .getPackage ().getName ());
2629 private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration .ofDays (3 );
@@ -31,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3134 private final ManagedChannel managedSidecarChannel ;
3235 private final DataConverter dataConverter ;
3336 private final Duration maximumTimerInterval ;
37+ private final ExecutorService workerPool ;
3438
3539 private final TaskHubSidecarServiceBlockingStub sidecarClient ;
3640
@@ -61,6 +65,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6165 this .sidecarClient = TaskHubSidecarServiceGrpc .newBlockingStub (sidecarGrpcChannel );
6266 this .dataConverter = builder .dataConverter != null ? builder .dataConverter : new JacksonDataConverter ();
6367 this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
68+ this .workerPool = builder .executorService != null ? builder .executorService : Executors .newCachedThreadPool ();
6469 }
6570
6671 /**
@@ -81,19 +86,8 @@ public void start() {
8186 * configured.
8287 */
8388 public void close () {
84- if (this .managedSidecarChannel != null ) {
85- try {
86- this .managedSidecarChannel .shutdownNow ().awaitTermination (5 , TimeUnit .SECONDS );
87- } catch (InterruptedException e ) {
88- // Best effort. Also note that AutoClose documentation recommends NOT having
89- // close() methods throw InterruptedException:
90- // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html
91- }
92- }
93- }
94-
95- private String getSidecarAddress () {
96- return this .sidecarClient .getChannel ().authority ();
89+ this .shutDownWorkerPool ();
90+ this .closeSideCarChannel ();
9791 }
9892
9993 /**
@@ -120,7 +114,7 @@ public void startAndBlock() {
120114 logger );
121115
122116 // TODO: How do we interrupt manually?
123- while (true ) {
117+ while (! this . workerPool . isShutdown () ) {
124118 try {
125119 GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest .newBuilder ().build ();
126120 Iterator <WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
@@ -130,51 +124,53 @@ public void startAndBlock() {
130124 if (requestType == RequestCase .ORCHESTRATORREQUEST ) {
131125 OrchestratorRequest orchestratorRequest = workItem .getOrchestratorRequest ();
132126
133- // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
134127 // TODO: Error handling
135- TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor .execute (
136- orchestratorRequest .getPastEventsList (),
137- orchestratorRequest .getNewEventsList ());
138-
139- OrchestratorResponse response = OrchestratorResponse .newBuilder ()
140- .setInstanceId (orchestratorRequest .getInstanceId ())
141- .addAllActions (taskOrchestratorResult .getActions ())
142- .setCustomStatus (StringValue .of (taskOrchestratorResult .getCustomStatus ()))
143- .build ();
144-
145- this .sidecarClient .completeOrchestratorTask (response );
128+ this .workerPool .submit (() -> {
129+ TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor .execute (
130+ orchestratorRequest .getPastEventsList (),
131+ orchestratorRequest .getNewEventsList ());
132+
133+ OrchestratorResponse response = OrchestratorResponse .newBuilder ()
134+ .setInstanceId (orchestratorRequest .getInstanceId ())
135+ .addAllActions (taskOrchestratorResult .getActions ())
136+ .setCustomStatus (StringValue .of (taskOrchestratorResult .getCustomStatus ()))
137+ .build ();
138+
139+ this .sidecarClient .completeOrchestratorTask (response );
140+ });
146141 } else if (requestType == RequestCase .ACTIVITYREQUEST ) {
147142 ActivityRequest activityRequest = workItem .getActivityRequest ();
148143
149- // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
150- String output = null ;
151- TaskFailureDetails failureDetails = null ;
152- try {
153- output = taskActivityExecutor .execute (
154- activityRequest .getName (),
155- activityRequest .getInput ().getValue (),
156- activityRequest .getTaskId ());
157- } catch (Throwable e ) {
158- failureDetails = TaskFailureDetails .newBuilder ()
159- .setErrorType (e .getClass ().getName ())
160- .setErrorMessage (e .getMessage ())
161- .setStackTrace (StringValue .of (FailureDetails .getFullStackTrace (e )))
162- .build ();
163- }
164-
165- ActivityResponse .Builder responseBuilder = ActivityResponse .newBuilder ()
166- .setInstanceId (activityRequest .getOrchestrationInstance ().getInstanceId ())
167- .setTaskId (activityRequest .getTaskId ());
168-
169- if (output != null ) {
170- responseBuilder .setResult (StringValue .of (output ));
171- }
172-
173- if (failureDetails != null ) {
174- responseBuilder .setFailureDetails (failureDetails );
175- }
176-
177- this .sidecarClient .completeActivityTask (responseBuilder .build ());
144+ this .workerPool .submit (() -> {
145+ String output = null ;
146+ TaskFailureDetails failureDetails = null ;
147+ try {
148+ output = taskActivityExecutor .execute (
149+ activityRequest .getName (),
150+ activityRequest .getInput ().getValue (),
151+ activityRequest .getTaskId ());
152+ } catch (Throwable e ) {
153+ failureDetails = TaskFailureDetails .newBuilder ()
154+ .setErrorType (e .getClass ().getName ())
155+ .setErrorMessage (e .getMessage ())
156+ .setStackTrace (StringValue .of (FailureDetails .getFullStackTrace (e )))
157+ .build ();
158+ }
159+
160+ ActivityResponse .Builder responseBuilder = ActivityResponse .newBuilder ()
161+ .setInstanceId (activityRequest .getOrchestrationInstance ().getInstanceId ())
162+ .setTaskId (activityRequest .getTaskId ());
163+
164+ if (output != null ) {
165+ responseBuilder .setResult (StringValue .of (output ));
166+ }
167+
168+ if (failureDetails != null ) {
169+ responseBuilder .setFailureDetails (failureDetails );
170+ }
171+
172+ this .sidecarClient .completeActivityTask (responseBuilder .build ());
173+ });
178174 } else {
179175 logger .log (Level .WARNING , "Received and dropped an unknown '{0}' work-item from the sidecar." , requestType );
180176 }
@@ -183,7 +179,7 @@ public void startAndBlock() {
183179 if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
184180 logger .log (Level .INFO , "The sidecar at address {0} is unavailable. Will continue retrying." , this .getSidecarAddress ());
185181 } else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
186- logger .log (Level .INFO , "Durable Task worker has disconnected from {0}." , this .getSidecarAddress ());
182+ logger .log (Level .INFO , "Durable Task worker has disconnected from {0}." , this .getSidecarAddress ());
187183 } else {
188184 logger .log (Level .WARNING , "Unexpected failure connecting to {0}." , this .getSidecarAddress ());
189185 }
@@ -204,4 +200,33 @@ public void startAndBlock() {
204200 public void stop () {
205201 this .close ();
206202 }
203+
204+ private void closeSideCarChannel () {
205+ if (this .managedSidecarChannel != null ) {
206+ try {
207+ this .managedSidecarChannel .shutdownNow ().awaitTermination (5 , TimeUnit .SECONDS );
208+ } catch (InterruptedException e ) {
209+ // Best effort. Also note that AutoClose documentation recommends NOT having
210+ // close() methods throw InterruptedException:
211+ // https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html
212+ }
213+ }
214+ }
215+
216+ private void shutDownWorkerPool () {
217+ logger .log (Level .WARNING , "ExecutorService shutdown initiated. No new tasks will be accepted" );
218+
219+ this .workerPool .shutdown ();
220+ try {
221+ if (!this .workerPool .awaitTermination (60 , TimeUnit .SECONDS )) {
222+ this .workerPool .shutdownNow ();
223+ }
224+ } catch (InterruptedException ex ) {
225+ Thread .currentThread ().interrupt ();
226+ }
227+ }
228+
229+ private String getSidecarAddress () {
230+ return this .sidecarClient .getChannel ().authority ();
231+ }
207232}
0 commit comments