Skip to content

Commit cd8f340

Browse files
add startWorker and shutdownWorker
1 parent ad8f70c commit cd8f340

File tree

5 files changed

+427
-7
lines changed

5 files changed

+427
-7
lines changed

temporal-sdk/src/main/java/io/temporal/common/SimplePluginBuilder.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public final class SimplePluginBuilder {
7171
private final List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers = new ArrayList<>();
7272
private final List<Consumer<WorkerOptions.Builder>> workerCustomizers = new ArrayList<>();
7373
private final List<BiConsumer<String, Worker>> workerInitializers = new ArrayList<>();
74+
private final List<BiConsumer<String, Worker>> workerStartCallbacks = new ArrayList<>();
75+
private final List<BiConsumer<String, Worker>> workerShutdownCallbacks = new ArrayList<>();
7476
private final List<WorkerInterceptor> workerInterceptors = new ArrayList<>();
7577
private final List<WorkflowClientInterceptor> clientInterceptors = new ArrayList<>();
7678
private final List<ContextPropagator> contextPropagators = new ArrayList<>();
@@ -164,6 +166,59 @@ public SimplePluginBuilder initializeWorker(@Nonnull BiConsumer<String, Worker>
164166
return this;
165167
}
166168

169+
/**
170+
* Adds a callback that is invoked when a worker starts. This can be used to start per-worker
171+
* resources or record metrics.
172+
*
173+
* <p>Note: For registering workflows and activities, use {@link #initializeWorker} instead, as
174+
* registrations must happen before the worker starts polling.
175+
*
176+
* <p>Example:
177+
*
178+
* <pre>{@code
179+
* SimplePluginBuilder.newBuilder("my-plugin")
180+
* .onWorkerStart((taskQueue, worker) -> {
181+
* logger.info("Worker started for task queue: {}", taskQueue);
182+
* perWorkerResources.put(taskQueue, new ResourcePool());
183+
* })
184+
* .build();
185+
* }</pre>
186+
*
187+
* @param callback a consumer that receives the task queue name and worker when the worker starts
188+
* @return this builder for chaining
189+
*/
190+
public SimplePluginBuilder onWorkerStart(@Nonnull BiConsumer<String, Worker> callback) {
191+
workerStartCallbacks.add(Objects.requireNonNull(callback));
192+
return this;
193+
}
194+
195+
/**
196+
* Adds a callback that is invoked when a worker shuts down. This can be used to clean up
197+
* per-worker resources initialized in {@link #initializeWorker} or {@link #onWorkerStart}.
198+
*
199+
* <p>Example:
200+
*
201+
* <pre>{@code
202+
* SimplePluginBuilder.newBuilder("my-plugin")
203+
* .onWorkerShutdown((taskQueue, worker) -> {
204+
* logger.info("Worker shutting down for task queue: {}", taskQueue);
205+
* ResourcePool pool = perWorkerResources.remove(taskQueue);
206+
* if (pool != null) {
207+
* pool.close();
208+
* }
209+
* })
210+
* .build();
211+
* }</pre>
212+
*
213+
* @param callback a consumer that receives the task queue name and worker when the worker shuts
214+
* down
215+
* @return this builder for chaining
216+
*/
217+
public SimplePluginBuilder onWorkerShutdown(@Nonnull BiConsumer<String, Worker> callback) {
218+
workerShutdownCallbacks.add(Objects.requireNonNull(callback));
219+
return this;
220+
}
221+
167222
/**
168223
* Adds worker interceptors. Interceptors are appended to any existing interceptors in the
169224
* configuration.
@@ -214,6 +269,8 @@ public PluginBase build() {
214269
new ArrayList<>(factoryCustomizers),
215270
new ArrayList<>(workerCustomizers),
216271
new ArrayList<>(workerInitializers),
272+
new ArrayList<>(workerStartCallbacks),
273+
new ArrayList<>(workerShutdownCallbacks),
217274
new ArrayList<>(workerInterceptors),
218275
new ArrayList<>(clientInterceptors),
219276
new ArrayList<>(contextPropagators));
@@ -226,6 +283,8 @@ private static final class SimplePlugin extends PluginBase {
226283
private final List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers;
227284
private final List<Consumer<WorkerOptions.Builder>> workerCustomizers;
228285
private final List<BiConsumer<String, Worker>> workerInitializers;
286+
private final List<BiConsumer<String, Worker>> workerStartCallbacks;
287+
private final List<BiConsumer<String, Worker>> workerShutdownCallbacks;
229288
private final List<WorkerInterceptor> workerInterceptors;
230289
private final List<WorkflowClientInterceptor> clientInterceptors;
231290
private final List<ContextPropagator> contextPropagators;
@@ -237,6 +296,8 @@ private static final class SimplePlugin extends PluginBase {
237296
List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers,
238297
List<Consumer<WorkerOptions.Builder>> workerCustomizers,
239298
List<BiConsumer<String, Worker>> workerInitializers,
299+
List<BiConsumer<String, Worker>> workerStartCallbacks,
300+
List<BiConsumer<String, Worker>> workerShutdownCallbacks,
240301
List<WorkerInterceptor> workerInterceptors,
241302
List<WorkflowClientInterceptor> clientInterceptors,
242303
List<ContextPropagator> contextPropagators) {
@@ -246,6 +307,8 @@ private static final class SimplePlugin extends PluginBase {
246307
this.factoryCustomizers = factoryCustomizers;
247308
this.workerCustomizers = workerCustomizers;
248309
this.workerInitializers = workerInitializers;
310+
this.workerStartCallbacks = workerStartCallbacks;
311+
this.workerShutdownCallbacks = workerShutdownCallbacks;
249312
this.workerInterceptors = workerInterceptors;
250313
this.clientInterceptors = clientInterceptors;
251314
this.contextPropagators = contextPropagators;
@@ -328,5 +391,24 @@ public void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker)
328391
initializer.accept(taskQueue, worker);
329392
}
330393
}
394+
395+
@Override
396+
public void startWorker(
397+
@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next)
398+
throws Exception {
399+
next.run();
400+
for (BiConsumer<String, Worker> callback : workerStartCallbacks) {
401+
callback.accept(taskQueue, worker);
402+
}
403+
}
404+
405+
@Override
406+
public void shutdownWorker(
407+
@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) {
408+
for (BiConsumer<String, Worker> callback : workerShutdownCallbacks) {
409+
callback.accept(taskQueue, worker);
410+
}
411+
next.run();
412+
}
331413
}
332414
}

temporal-sdk/src/main/java/io/temporal/worker/Plugin.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ default WorkerOptions.Builder configureWorker(
106106
* services, and other components on the worker.
107107
*
108108
* <p>This method is called in forward (registration) order immediately after the worker is
109-
* created in {@link WorkerFactory#newWorker}.
109+
* created in {@link WorkerFactory#newWorker}. This is the appropriate place for registrations
110+
* because it is called before the worker starts polling.
110111
*
111112
* <p>Example:
112113
*
@@ -125,6 +126,68 @@ default void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker)
125126
// Default: no-op
126127
}
127128

129+
/**
130+
* Allows the plugin to wrap individual worker startup. Called during execution phase in reverse
131+
* order (first plugin wraps all others) when {@link WorkerFactory#start()} is invoked.
132+
*
133+
* <p>This method is called for each worker when the factory starts. Use this for per-worker
134+
* resource initialization, logging, or metrics. Note that workflow/activity registration should
135+
* be done in {@link #initializeWorker} instead, as this method is called after registrations are
136+
* finalized.
137+
*
138+
* <p>Example:
139+
*
140+
* <pre>{@code
141+
* @Override
142+
* public void startWorker(String taskQueue, Worker worker, Runnable next) throws Exception {
143+
* logger.info("Starting worker for task queue: {}", taskQueue);
144+
* perWorkerResources.put(taskQueue, new ResourcePool());
145+
* next.run();
146+
* }
147+
* }</pre>
148+
*
149+
* @param taskQueue the task queue name for the worker
150+
* @param worker the worker being started
151+
* @param next runnable that starts the next in chain (eventually starts the actual worker)
152+
* @throws Exception if startup fails
153+
*/
154+
default void startWorker(
155+
@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) throws Exception {
156+
next.run();
157+
}
158+
159+
/**
160+
* Allows the plugin to wrap individual worker shutdown. Called during shutdown phase in reverse
161+
* order (first plugin wraps all others) when {@link WorkerFactory#shutdown()} or {@link
162+
* WorkerFactory#shutdownNow()} is invoked.
163+
*
164+
* <p>This method is called for each worker when the factory shuts down. Use this for per-worker
165+
* resource cleanup that was initialized in {@link #startWorker} or {@link #initializeWorker}.
166+
*
167+
* <p>Example:
168+
*
169+
* <pre>{@code
170+
* @Override
171+
* public void shutdownWorker(String taskQueue, Worker worker, Runnable next) {
172+
* logger.info("Shutting down worker for task queue: {}", taskQueue);
173+
* next.run();
174+
* ResourcePool pool = perWorkerResources.remove(taskQueue);
175+
* if (pool != null) {
176+
* pool.close();
177+
* }
178+
* }
179+
* }</pre>
180+
*
181+
* @param taskQueue the task queue name for the worker
182+
* @param worker the worker being shut down
183+
* @param next runnable that shuts down the next in chain (eventually shuts down the actual
184+
* worker)
185+
*/
186+
default void shutdownWorker(
187+
@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) {
188+
next.run();
189+
}
190+
128191
/**
129192
* Allows the plugin to wrap worker factory startup. Called during execution phase in reverse
130193
* order (first plugin wraps all others).

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,39 @@ public synchronized void start() {
263263

264264
/** Internal method that actually starts the workers. Called from the plugin chain. */
265265
private void doStart() {
266-
for (Worker worker : workers.values()) {
267-
worker.start();
266+
// Start each worker with plugin hooks
267+
for (Map.Entry<String, Worker> entry : workers.entrySet()) {
268+
String taskQueue = entry.getKey();
269+
Worker worker = entry.getValue();
270+
271+
// Build plugin chain for this worker (reverse order for proper nesting)
272+
Runnable startChain = worker::start;
273+
List<Object> reversed = new ArrayList<>(plugins);
274+
Collections.reverse(reversed);
275+
for (Object plugin : reversed) {
276+
if (plugin instanceof Plugin) {
277+
final Runnable next = startChain;
278+
final Plugin workerPlugin = (Plugin) plugin;
279+
startChain =
280+
() -> {
281+
try {
282+
workerPlugin.startWorker(taskQueue, worker, next);
283+
} catch (RuntimeException e) {
284+
throw e;
285+
} catch (Exception e) {
286+
throw new RuntimeException(
287+
"Plugin "
288+
+ workerPlugin.getName()
289+
+ " failed during worker startup for task queue "
290+
+ taskQueue,
291+
e);
292+
}
293+
};
294+
}
295+
}
296+
297+
// Execute the chain for this worker
298+
startChain.run();
268299
}
269300

270301
state = State.Started;
@@ -368,10 +399,51 @@ private void shutdownInternal(boolean interruptUserTasks) {
368399
private void doShutdown(boolean interruptUserTasks) {
369400
((WorkflowClientInternal) workflowClient.getInternal()).deregisterWorkerFactory(this);
370401
ShutdownManager shutdownManager = new ShutdownManager();
371-
CompletableFuture.allOf(
372-
workers.values().stream()
373-
.map(worker -> worker.shutdown(shutdownManager, interruptUserTasks))
374-
.toArray(CompletableFuture[]::new))
402+
403+
// Shutdown each worker with plugin hooks
404+
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>();
405+
for (Map.Entry<String, Worker> entry : workers.entrySet()) {
406+
String taskQueue = entry.getKey();
407+
Worker worker = entry.getValue();
408+
409+
// Build plugin chain for this worker's shutdown (reverse order for proper nesting)
410+
// We use a holder to capture the future from the terminal action
411+
@SuppressWarnings("unchecked")
412+
CompletableFuture<Void>[] futureHolder = new CompletableFuture[1];
413+
Runnable shutdownChain =
414+
() -> futureHolder[0] = worker.shutdown(shutdownManager, interruptUserTasks);
415+
416+
List<Object> reversed = new ArrayList<>(plugins);
417+
Collections.reverse(reversed);
418+
for (Object plugin : reversed) {
419+
if (plugin instanceof Plugin) {
420+
final Runnable next = shutdownChain;
421+
final Plugin workerPlugin = (Plugin) plugin;
422+
shutdownChain =
423+
() -> {
424+
try {
425+
workerPlugin.shutdownWorker(taskQueue, worker, next);
426+
} catch (Exception e) {
427+
log.warn(
428+
"Plugin {} failed during worker shutdown for task queue {}",
429+
workerPlugin.getName(),
430+
taskQueue,
431+
e);
432+
// Still try to continue shutdown
433+
next.run();
434+
}
435+
};
436+
}
437+
}
438+
439+
// Execute the shutdown chain for this worker
440+
shutdownChain.run();
441+
if (futureHolder[0] != null) {
442+
shutdownFutures.add(futureHolder[0]);
443+
}
444+
}
445+
446+
CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0]))
375447
.thenApply(
376448
r -> {
377449
cache.invalidateAll();

0 commit comments

Comments
 (0)