20
20
import java .util .logging .Logger ;
21
21
22
22
/**
23
- * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
23
+ * Task hub worker that connects to a sidecar process over gRPC to execute
24
+ * orchestrator and activity events.
24
25
*/
25
26
public final class DurableTaskGrpcWorker implements AutoCloseable {
26
27
@@ -39,8 +40,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
39
40
private final TaskHubSidecarServiceBlockingStub sidecarClient ;
40
41
private final boolean isExecutorServiceManaged ;
41
42
private volatile boolean isNormalShutdown = false ;
42
- private Thread processorThread ;
43
-
43
+ private Thread workerThread ;
44
+
44
45
DurableTaskGrpcWorker (DurableTaskGrpcWorkerBuilder builder ) {
45
46
this .orchestrationFactories .putAll (builder .orchestrationFactories );
46
47
this .activityFactories .putAll (builder .activityFactories );
@@ -67,46 +68,60 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
67
68
68
69
this .sidecarClient = TaskHubSidecarServiceGrpc .newBlockingStub (sidecarGrpcChannel );
69
70
this .dataConverter = builder .dataConverter != null ? builder .dataConverter : new JacksonDataConverter ();
70
- this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
71
+ this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval
72
+ : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
71
73
this .workerPool = builder .executorService != null ? builder .executorService : Executors .newCachedThreadPool ();
72
74
this .isExecutorServiceManaged = builder .executorService == null ;
73
75
}
74
76
75
77
/**
76
- * Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
78
+ * Establishes a gRPC connection to the sidecar and starts processing work-items
79
+ * in the background.
77
80
* <p>
78
- * This method retries continuously to establish a connection to the sidecar. If a connection fails,
79
- * a warning log message will be written and a new connection attempt will be made. This process
80
- * continues until either a connection succeeds or the process receives an interrupt signal.
81
+ * This method retries continuously to establish a connection to the sidecar. If
82
+ * a connection fails,
83
+ * a warning log message will be written and a new connection attempt will be
84
+ * made. This process
85
+ * continues until either a connection succeeds or the process receives an
86
+ * interrupt signal.
81
87
*/
82
88
public void start () {
83
- this .processorThread = new Thread (this ::startAndBlock );
84
- this .processorThread .start ();
89
+ this .workerThread = new Thread (this ::startAndBlock );
90
+ this .workerThread .start ();
85
91
}
86
92
87
93
88
94
/**
89
- * Closes the internally managed gRPC channel and executor service, if one exists.
95
+ * Closes the internally managed gRPC channel and executor service, if one
96
+ * exists.
90
97
* <p>
91
- * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
98
+ * Only the internally managed GRPC Channel and Executor services are closed. If
99
+ * any of them are supplied,
92
100
* it is the responsibility of the supplier to take care of them.
93
101
*/
94
102
public void close () {
103
+ this .workerThread .interrupt ();
95
104
this .isNormalShutdown = true ;
96
105
this .processorThread .interrupt ();
97
106
this .shutDownWorkerPool ();
98
107
this .closeSideCarChannel ();
99
108
}
100
109
101
110
/**
102
- * Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
103
- * This method call blocks indefinitely, or until the current thread is interrupted.
111
+ * Establishes a gRPC connection to the sidecar and starts processing work-items
112
+ * on the current thread.
113
+ * This method call blocks indefinitely, or until the current thread is
114
+ * interrupted.
104
115
* <p>
105
- * Use can alternatively use the {@link #start} method to run orchestration processing in a background thread.
116
+ * Use can alternatively use the {@link #start} method to run orchestration
117
+ * processing in a background thread.
106
118
* <p>
107
- * This method retries continuously to establish a connection to the sidecar. If a connection fails,
108
- * a warning log message will be written and a new connection attempt will be made. This process
109
- * continues until either a connection succeeds or the process receives an interrupt signal.
119
+ * This method retries continuously to establish a connection to the sidecar. If
120
+ * a connection fails,
121
+ * a warning log message will be written and a new connection attempt will be
122
+ * made. This process
123
+ * continues until either a connection succeeds or the process receives an
124
+ * interrupt signal.
110
125
*/
111
126
public void startAndBlock () {
112
127
logger .log (Level .INFO , "Durable Task worker is connecting to sidecar at {0}." , this .getSidecarAddress ());
@@ -121,8 +136,7 @@ public void startAndBlock() {
121
136
this .dataConverter ,
122
137
logger );
123
138
124
- // TODO: How do we interrupt manually?
125
- while (true && !this .isNormalShutdown ) {
139
+ while (true ) {
126
140
try {
127
141
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest .newBuilder ().build ();
128
142
Iterator <WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
@@ -149,11 +163,17 @@ public void startAndBlock() {
149
163
this .sidecarClient .completeOrchestratorTask (response );
150
164
} catch (StatusRuntimeException e ) {
151
165
if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
152
- logger .log (Level .WARNING , "The sidecar at address {0} is unavailable while completing the orchestrator task." , this .getSidecarAddress ());
166
+ logger .log (Level .WARNING ,
167
+ "The sidecar at address {0} is unavailable while completing the orchestrator task." ,
168
+ this .getSidecarAddress ());
153
169
} else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
154
- logger .log (Level .WARNING , "Durable Task worker has disconnected from {0} while completing the orchestrator task." , this .getSidecarAddress ());
170
+ logger .log (Level .WARNING ,
171
+ "Durable Task worker has disconnected from {0} while completing the orchestrator task." ,
172
+ this .getSidecarAddress ());
155
173
} else {
156
- logger .log (Level .WARNING , "Unexpected failure completing the orchestrator task at {0}." , this .getSidecarAddress ());
174
+ logger .log (Level .WARNING ,
175
+ "Unexpected failure completing the orchestrator task at {0}." ,
176
+ this .getSidecarAddress ());
157
177
}
158
178
}
159
179
});
@@ -193,29 +213,35 @@ public void startAndBlock() {
193
213
this .sidecarClient .completeActivityTask (responseBuilder .build ());
194
214
} catch (StatusRuntimeException e ) {
195
215
if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
196
- logger .log (Level .WARNING , "The sidecar at address {0} is unavailable while completing the activity task." , this .getSidecarAddress ());
216
+ logger .log (Level .WARNING ,
217
+ "The sidecar at address {0} is unavailable while completing the activity task." ,
218
+ this .getSidecarAddress ());
197
219
} else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
198
- logger .log (Level .WARNING , "Durable Task worker has disconnected from {0} while completing the activity task." , this .getSidecarAddress ());
220
+ logger .log (Level .WARNING ,
221
+ "Durable Task worker has disconnected from {0} while completing the activity task." ,
222
+ this .getSidecarAddress ());
199
223
} else {
200
- logger .log (Level .WARNING , "Unexpected failure completing the activity task at {0}." , this .getSidecarAddress ());
224
+ logger .log (Level .WARNING , "Unexpected failure completing the activity task at {0}." ,
225
+ this .getSidecarAddress ());
201
226
}
202
227
}
203
228
});
204
- }
205
- else if (requestType == RequestCase .HEALTHPING )
206
- {
229
+ } else if (requestType == RequestCase .HEALTHPING ) {
207
230
// No-op
208
231
} else {
209
- logger .log (Level .WARNING , "Received and dropped an unknown '{0}' work-item from the sidecar." , requestType );
232
+ logger .log (Level .WARNING , "Received and dropped an unknown '{0}' work-item from the sidecar." ,
233
+ requestType );
210
234
}
211
235
}
212
236
} catch (StatusRuntimeException e ) {
213
237
if (e .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
214
- logger .log (Level .INFO , "The sidecar at address {0} is unavailable. Will continue retrying." , this .getSidecarAddress ());
238
+ logger .log (Level .INFO , "The sidecar at address {0} is unavailable. Will continue retrying." ,
239
+ this .getSidecarAddress ());
215
240
} else if (e .getStatus ().getCode () == Status .Code .CANCELLED ) {
216
241
logger .log (Level .INFO , "Durable Task worker has disconnected from {0}." , this .getSidecarAddress ());
217
242
} else {
218
- logger .log (Level .WARNING , String .format ("Unexpected failure connecting to %s" , this .getSidecarAddress ()), e );
243
+ logger .log (Level .WARNING ,
244
+ String .format ("Unexpected failure connecting to %s" , this .getSidecarAddress ()), e );
219
245
}
220
246
221
247
// Retry after 5 seconds
@@ -229,7 +255,8 @@ else if (requestType == RequestCase.HEALTHPING)
229
255
}
230
256
231
257
/**
232
- * Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed.
258
+ * Stops the current worker's listen loop, preventing any new orchestrator or
259
+ * activity events from being processed.
233
260
*/
234
261
public void stop () {
235
262
this .close ();
@@ -250,7 +277,8 @@ private void closeSideCarChannel() {
250
277
private void shutDownWorkerPool () {
251
278
if (this .isExecutorServiceManaged ) {
252
279
if (!this .isNormalShutdown ) {
253
- logger .log (Level .WARNING , "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
280
+ logger .log (Level .WARNING ,
281
+ "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
254
282
}
255
283
256
284
this .workerPool .shutdown ();
0 commit comments