Skip to content

Commit 4a5aa97

Browse files
authored
Wait for worker slots to be fully released in the graceful worker shutdown (#1679)
1 parent 6156036 commit 4a5aa97

File tree

6 files changed

+223
-67
lines changed

6 files changed

+223
-67
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
import org.slf4j.LoggerFactory;
6363

6464
public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
65+
private static final Logger log =
66+
LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
67+
6568
public static final ImmutableSet<String> WORKFLOW_HANDLER_STACKTRACE_CUTOFF =
6669
ImmutableSet.<String>builder()
6770
// POJO
@@ -71,13 +74,8 @@ public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFa
7174
// Dynamic
7275
.add(
7376
ReflectionUtils.getMethodNameForStackTraceCutoff(
74-
DynamicSyncWorkflowDefinition.RootWorkflowInboundCallsInterceptor.class,
75-
"execute",
76-
WorkflowInboundCallsInterceptor.WorkflowInput.class))
77+
DynamicSyncWorkflowDefinition.class, "execute", Header.class, Optional.class))
7778
.build();
78-
79-
private static final Logger log =
80-
LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
8179
private final WorkerInterceptor[] workerInterceptors;
8280

8381
private final DataConverter dataConverter;

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ final class ActivityWorker implements SuspendableWorker {
6464
private final Scope workerMetricsScope;
6565
private final GrpcRetryer grpcRetryer;
6666
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
67+
private final int executorSlots;
6768
private final Semaphore executorSlotsSemaphore;
6869

6970
public ActivityWorker(
@@ -86,7 +87,8 @@ public ActivityWorker(
8687
this.replyGrpcRetryerOptions =
8788
new GrpcRetryer.GrpcRetryerOptions(
8889
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
89-
this.executorSlotsSemaphore = new Semaphore(options.getTaskExecutorThreadPoolSize());
90+
this.executorSlots = options.getTaskExecutorThreadPoolSize();
91+
this.executorSlotsSemaphore = new Semaphore(executorSlots);
9092
}
9193

9294
@Override
@@ -126,12 +128,33 @@ public boolean start() {
126128

127129
@Override
128130
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
129-
return poller.shutdown(shutdownManager, interruptTasks);
131+
String semaphoreName = this + "#executorSlotsSemaphore";
132+
return poller
133+
.shutdown(shutdownManager, interruptTasks)
134+
.thenCompose(
135+
ignore ->
136+
!interruptTasks
137+
? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
138+
executorSlotsSemaphore, executorSlots, semaphoreName)
139+
: CompletableFuture.completedFuture(null))
140+
.thenCompose(
141+
ignore ->
142+
pollTaskExecutor != null
143+
? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
144+
: CompletableFuture.completedFuture(null))
145+
.exceptionally(
146+
e -> {
147+
log.error("Unexpected exception during shutdown", e);
148+
return null;
149+
});
130150
}
131151

132152
@Override
133153
public void awaitTermination(long timeout, TimeUnit unit) {
134-
poller.awaitTermination(timeout, unit);
154+
long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
155+
// relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
156+
// wait separately for intermediate steps
157+
ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
135158
}
136159

137160
@Override
@@ -151,7 +174,7 @@ public boolean isShutdown() {
151174

152175
@Override
153176
public boolean isTerminated() {
154-
return poller.isTerminated();
177+
return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
155178
}
156179

157180
@Override
@@ -180,6 +203,13 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) {
180203
return pollerOptions;
181204
}
182205

206+
@Override
207+
public String toString() {
208+
return String.format(
209+
"ActivityWorker{identity=%s, namespace=%s, taskQueue=%s}",
210+
options.getIdentity(), namespace, taskQueue);
211+
}
212+
183213
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
184214

185215
final ActivityTaskHandler handler;

temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,6 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
134134
// it's ok to forcefully shutdown pollers, especially because they stuck in a long poll call
135135
// we don't lose any progress doing that
136136
.shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1))
137-
// TODO Poller shouldn't shutdown taskExecutor, because it gets it already created
138-
// externally. Creator of taskExecutor should be responsible for it's shutdown.
139-
.thenCompose(ignore -> taskExecutor.shutdown(shutdownManager, interruptTasks))
140137
.exceptionally(
141138
e -> {
142139
log.error("Unexpected exception during shutdown", e);
@@ -154,8 +151,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
154151
}
155152

156153
long timeoutMillis = unit.toMillis(timeout);
157-
timeoutMillis = ShutdownManager.awaitTermination(pollExecutor, timeoutMillis);
158-
ShutdownManager.awaitTermination(taskExecutor, timeoutMillis);
154+
ShutdownManager.awaitTermination(pollExecutor, timeoutMillis);
159155
}
160156

161157
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java

Lines changed: 137 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ public CompletableFuture<Void> shutdownExecutorUntimed(
6868
return untimedWait(executorToShutdown, executorName);
6969
}
7070

71+
public CompletableFuture<Void> waitForSemaphorePermitsReleaseUntimed(
72+
Semaphore semaphore, int initialSemaphorePermits, String semaphoreName) {
73+
CompletableFuture<Void> future = new CompletableFuture<>();
74+
scheduledExecutorService.submit(
75+
new SemaphoreReportingDelayShutdown(
76+
semaphore, initialSemaphorePermits, semaphoreName, future));
77+
return future;
78+
}
79+
7180
/**
7281
* Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
7382
* when the executor is terminated.
@@ -76,7 +85,7 @@ private CompletableFuture<Void> untimedWait(
7685
ExecutorService executorToShutdown, String executorName) {
7786
CompletableFuture<Void> future = new CompletableFuture<>();
7887
scheduledExecutorService.submit(
79-
new ReportingDelayShutdown(executorToShutdown, executorName, future));
88+
new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
8089
return future;
8190
}
8291

@@ -91,7 +100,7 @@ private CompletableFuture<Void> limitedWait(
91100

92101
CompletableFuture<Void> future = new CompletableFuture<>();
93102
scheduledExecutorService.submit(
94-
new LimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
103+
new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
95104
return future;
96105
}
97106

@@ -100,67 +109,90 @@ public void close() {
100109
scheduledExecutorService.shutdownNow();
101110
}
102111

103-
private class LimitedWaitShutdown implements Runnable {
104-
private final ExecutorService executorToShutdown;
112+
private abstract class LimitedWaitShutdown implements Runnable {
105113
private final CompletableFuture<Void> promise;
106114
private final int maxAttempts;
107-
private final String executorName;
108115
private int attempt;
109116

110-
public LimitedWaitShutdown(
111-
ExecutorService executorToShutdown,
112-
int maxAttempts,
113-
String executorName,
114-
CompletableFuture<Void> promise) {
115-
this.executorToShutdown = executorToShutdown;
117+
public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
116118
this.promise = promise;
117119
this.maxAttempts = maxAttempts;
118-
this.executorName = executorName;
119120
}
120121

121122
@Override
122123
public void run() {
123-
if (executorToShutdown.isTerminated()) {
124+
if (isTerminated()) {
125+
onSuccessfulTermination();
124126
promise.complete(null);
125127
return;
126128
}
127129
attempt++;
128130
if (attempt > maxAttempts) {
129-
log.warn(
130-
"Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()",
131-
executorName);
132-
executorToShutdown.shutdownNow();
131+
onAttemptExhaustion();
133132
// we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
134133
// so just log and complete the promise
135134
promise.complete(null);
136135
return;
137136
}
138137
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
139138
}
139+
140+
abstract boolean isTerminated();
141+
142+
abstract void onAttemptExhaustion();
143+
144+
abstract void onSuccessfulTermination();
140145
}
141146

142-
private class ReportingDelayShutdown implements Runnable {
147+
private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
148+
private final ExecutorService executorToShutdown;
149+
private final String executorName;
150+
151+
public ExecutorLimitedWaitShutdown(
152+
ExecutorService executorToShutdown,
153+
int maxAttempts,
154+
String executorName,
155+
CompletableFuture<Void> promise) {
156+
super(maxAttempts, promise);
157+
this.executorToShutdown = executorToShutdown;
158+
this.executorName = executorName;
159+
}
160+
161+
@Override
162+
boolean isTerminated() {
163+
return executorToShutdown.isTerminated();
164+
}
165+
166+
@Override
167+
void onAttemptExhaustion() {
168+
log.warn(
169+
"Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
170+
executorToShutdown.shutdownNow();
171+
}
172+
173+
@Override
174+
void onSuccessfulTermination() {}
175+
}
176+
177+
private abstract class ReportingDelayShutdown implements Runnable {
178+
// measures in attempts count, not in ms
143179
private static final int BLOCKED_REPORTING_THRESHOLD = 60;
144180
private static final int BLOCKED_REPORTING_PERIOD = 20;
145181

146-
private final ExecutorService executorToShutdown;
147182
private final CompletableFuture<Void> promise;
148-
private final String executorName;
149183
private int attempt;
150184

151-
public ReportingDelayShutdown(
152-
ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
153-
this.executorToShutdown = executorToShutdown;
185+
public ReportingDelayShutdown(CompletableFuture<Void> promise) {
154186
this.promise = promise;
155-
this.executorName = executorName;
156187
}
157188

158189
@Override
159190
public void run() {
160-
if (executorToShutdown.isTerminated()) {
191+
if (isTerminated()) {
161192
if (attempt > BLOCKED_REPORTING_THRESHOLD) {
162-
// log warn only if we already logged a shutdown being delayed
163-
log.warn("{} successfully terminated", executorName);
193+
onSlowSuccessfulTermination();
194+
} else {
195+
onSuccessfulTermination();
164196
}
165197
promise.complete(null);
166198
return;
@@ -170,13 +202,87 @@ public void run() {
170202
if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
171203
// and repeat every BLOCKED_REPORTING_PERIOD attempts
172204
if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
173-
log.warn(
174-
"Graceful shutdown of {} is blocked by one of the long currently processing tasks",
175-
executorName);
205+
onSlowTermination();
176206
}
177207
}
178208
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
179209
}
210+
211+
abstract boolean isTerminated();
212+
213+
abstract void onSlowTermination();
214+
215+
abstract void onSuccessfulTermination();
216+
217+
/** Called only if {@link #onSlowTermination()} was called before */
218+
abstract void onSlowSuccessfulTermination();
219+
}
220+
221+
private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
222+
private final ExecutorService executorToShutdown;
223+
private final String executorName;
224+
225+
public ExecutorReportingDelayShutdown(
226+
ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
227+
super(promise);
228+
this.executorToShutdown = executorToShutdown;
229+
this.executorName = executorName;
230+
}
231+
232+
@Override
233+
boolean isTerminated() {
234+
return executorToShutdown.isTerminated();
235+
}
236+
237+
@Override
238+
void onSlowTermination() {
239+
log.warn(
240+
"Graceful shutdown of {} is blocked by one of the long currently processing tasks",
241+
executorName);
242+
}
243+
244+
@Override
245+
void onSuccessfulTermination() {}
246+
247+
@Override
248+
void onSlowSuccessfulTermination() {
249+
log.warn("{} successfully terminated", executorName);
250+
}
251+
}
252+
253+
private class SemaphoreReportingDelayShutdown extends ReportingDelayShutdown {
254+
private final Semaphore semaphore;
255+
private final int initialSemaphorePermits;
256+
private final String semaphoreName;
257+
258+
public SemaphoreReportingDelayShutdown(
259+
Semaphore semaphore,
260+
int initialSemaphorePermits,
261+
String semaphoreName,
262+
CompletableFuture<Void> promise) {
263+
super(promise);
264+
this.semaphore = semaphore;
265+
this.initialSemaphorePermits = initialSemaphorePermits;
266+
this.semaphoreName = semaphoreName;
267+
}
268+
269+
@Override
270+
boolean isTerminated() {
271+
return semaphore.availablePermits() == initialSemaphorePermits;
272+
}
273+
274+
@Override
275+
void onSlowTermination() {
276+
log.warn("Wait for release of slots of {} takes a long time", semaphoreName);
277+
}
278+
279+
@Override
280+
void onSuccessfulTermination() {}
281+
282+
@Override
283+
void onSlowSuccessfulTermination() {
284+
log.warn("All slots of {} were successfully released", semaphoreName);
285+
}
180286
}
181287

182288
public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
@@ -187,7 +293,7 @@ public static long awaitTermination(@Nullable ExecutorService s, long timeoutMil
187293
timeoutMillis,
188294
() -> {
189295
try {
190-
s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
296+
boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
191297
} catch (InterruptedException e) {
192298
Thread.currentThread().interrupt();
193299
}

0 commit comments

Comments
 (0)