Skip to content

Commit c37f492

Browse files
Re-do shutdownWorkerFactory design with chain
1 parent 4d7ef86 commit c37f492

File tree

2 files changed

+44
-17
lines changed

2 files changed

+44
-17
lines changed

temporal-sdk/src/main/java/io/temporal/common/plugin/WorkerPlugin.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,28 @@ default void runWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable
156156
}
157157

158158
/**
159-
* Called when the worker factory is shutting down. Plugins are notified in forward (registration)
160-
* order during shutdown.
159+
* Allows the plugin to wrap worker factory shutdown. Called during shutdown phase in reverse
160+
* order (first plugin wraps all others).
161+
*
162+
* <p>This method is called when {@link WorkerFactory#shutdown()} or {@link
163+
* WorkerFactory#shutdownNow()} is invoked. The plugin can perform actions before and after the
164+
* actual shutdown occurs.
161165
*
162-
* <p>This is called during both {@link WorkerFactory#shutdown()} and {@link
163-
* WorkerFactory#shutdownNow()}.
166+
* <p>Example:
167+
*
168+
* <pre>{@code
169+
* @Override
170+
* public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) {
171+
* logger.info("Shutting down workers...");
172+
* next.run();
173+
* logger.info("Workers shut down");
174+
* }
175+
* }</pre>
164176
*
165177
* @param factory the worker factory being shut down
178+
* @param next runnable that shuts down the next in chain (eventually shuts down actual workers)
166179
*/
167-
default void onWorkerFactoryShutdown(@Nonnull WorkerFactory factory) {
168-
// Default: no-op
180+
default void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next) {
181+
next.run();
169182
}
170183
}

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
170170
workflowClient.getOptions().getContextPropagators());
171171
workers.put(taskQueue, worker);
172172

173-
// Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, activities, etc.)
173+
// Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows,
174+
// activities, etc.)
174175
for (Object plugin : plugins) {
175176
if (plugin instanceof WorkerPlugin) {
176177
((WorkerPlugin) plugin).initializeWorker(taskQueue, worker);
@@ -339,20 +340,33 @@ public synchronized void shutdownNow() {
339340
private void shutdownInternal(boolean interruptUserTasks) {
340341
state = State.Shutdown;
341342

342-
// Notify plugins of shutdown (forward order)
343-
for (Object plugin : plugins) {
343+
// Build plugin shutdown chain (reverse order for proper nesting)
344+
Runnable shutdownChain = () -> doShutdown(interruptUserTasks);
345+
List<Object> reversed = new ArrayList<>(plugins);
346+
Collections.reverse(reversed);
347+
for (Object plugin : reversed) {
344348
if (plugin instanceof WorkerPlugin) {
345-
try {
346-
((WorkerPlugin) plugin).onWorkerFactoryShutdown(this);
347-
} catch (Exception e) {
348-
log.warn(
349-
"Plugin {} failed during shutdown notification",
350-
((WorkerPlugin) plugin).getName(),
351-
e);
352-
}
349+
final Runnable next = shutdownChain;
350+
final WorkerPlugin workerPlugin = (WorkerPlugin) plugin;
351+
shutdownChain =
352+
() -> {
353+
try {
354+
workerPlugin.shutdownWorkerFactory(this, next);
355+
} catch (Exception e) {
356+
log.warn("Plugin {} failed during shutdown", workerPlugin.getName(), e);
357+
// Still try to continue shutdown
358+
next.run();
359+
}
360+
};
353361
}
354362
}
355363

364+
// Execute the chain
365+
shutdownChain.run();
366+
}
367+
368+
/** Internal method that actually shuts down workers. Called from the plugin chain. */
369+
private void doShutdown(boolean interruptUserTasks) {
356370
((WorkflowClientInternal) workflowClient.getInternal()).deregisterWorkerFactory(this);
357371
ShutdownManager shutdownManager = new ShutdownManager();
358372
CompletableFuture.allOf(

0 commit comments

Comments
 (0)