Skip to content

Commit 4d7ef86

Browse files
Add initializeWorker in plugin interface.
1 parent ac87f44 commit 4d7ef86

File tree

5 files changed

+115
-0
lines changed

5 files changed

+115
-0
lines changed

temporal-sdk/src/main/java/io/temporal/common/plugin/SimplePluginBuilder.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import io.temporal.common.interceptors.WorkerInterceptor;
2727
import io.temporal.common.interceptors.WorkflowClientInterceptor;
2828
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
29+
import io.temporal.worker.Worker;
2930
import io.temporal.worker.WorkerFactoryOptions;
3031
import io.temporal.worker.WorkerOptions;
3132
import java.util.ArrayList;
3233
import java.util.Arrays;
3334
import java.util.List;
3435
import java.util.Objects;
36+
import java.util.function.BiConsumer;
3537
import java.util.function.Consumer;
3638
import javax.annotation.Nonnull;
3739

@@ -69,6 +71,7 @@ public final class SimplePluginBuilder {
6971
private final List<Consumer<WorkflowClientOptions.Builder>> clientCustomizers = new ArrayList<>();
7072
private final List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers = new ArrayList<>();
7173
private final List<Consumer<WorkerOptions.Builder>> workerCustomizers = new ArrayList<>();
74+
private final List<BiConsumer<String, Worker>> workerInitializers = new ArrayList<>();
7275
private final List<WorkerInterceptor> workerInterceptors = new ArrayList<>();
7376
private final List<WorkflowClientInterceptor> clientInterceptors = new ArrayList<>();
7477
private final List<ContextPropagator> contextPropagators = new ArrayList<>();
@@ -139,6 +142,29 @@ public SimplePluginBuilder customizeWorker(@Nonnull Consumer<WorkerOptions.Build
139142
return this;
140143
}
141144

145+
/**
146+
* Adds an initializer that is called after a worker is created. This can be used to register
147+
* workflows, activities, and Nexus services on the worker.
148+
*
149+
* <p>Example:
150+
*
151+
* <pre>{@code
152+
* SimplePluginBuilder.newBuilder("my-plugin")
153+
* .initializeWorker((taskQueue, worker) -> {
154+
* worker.registerWorkflowImplementationTypes(MyWorkflow.class);
155+
* worker.registerActivitiesImplementations(new MyActivityImpl());
156+
* })
157+
* .build();
158+
* }</pre>
159+
*
160+
* @param initializer a consumer that receives the task queue name and worker
161+
* @return this builder for chaining
162+
*/
163+
public SimplePluginBuilder initializeWorker(@Nonnull BiConsumer<String, Worker> initializer) {
164+
workerInitializers.add(Objects.requireNonNull(initializer));
165+
return this;
166+
}
167+
142168
/**
143169
* Adds worker interceptors. Interceptors are appended to any existing interceptors in the
144170
* configuration.
@@ -188,6 +214,7 @@ public PluginBase build() {
188214
new ArrayList<>(clientCustomizers),
189215
new ArrayList<>(factoryCustomizers),
190216
new ArrayList<>(workerCustomizers),
217+
new ArrayList<>(workerInitializers),
191218
new ArrayList<>(workerInterceptors),
192219
new ArrayList<>(clientInterceptors),
193220
new ArrayList<>(contextPropagators));
@@ -199,6 +226,7 @@ private static final class SimplePlugin extends PluginBase {
199226
private final List<Consumer<WorkflowClientOptions.Builder>> clientCustomizers;
200227
private final List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers;
201228
private final List<Consumer<WorkerOptions.Builder>> workerCustomizers;
229+
private final List<BiConsumer<String, Worker>> workerInitializers;
202230
private final List<WorkerInterceptor> workerInterceptors;
203231
private final List<WorkflowClientInterceptor> clientInterceptors;
204232
private final List<ContextPropagator> contextPropagators;
@@ -209,6 +237,7 @@ private static final class SimplePlugin extends PluginBase {
209237
List<Consumer<WorkflowClientOptions.Builder>> clientCustomizers,
210238
List<Consumer<WorkerFactoryOptions.Builder>> factoryCustomizers,
211239
List<Consumer<WorkerOptions.Builder>> workerCustomizers,
240+
List<BiConsumer<String, Worker>> workerInitializers,
212241
List<WorkerInterceptor> workerInterceptors,
213242
List<WorkflowClientInterceptor> clientInterceptors,
214243
List<ContextPropagator> contextPropagators) {
@@ -217,6 +246,7 @@ private static final class SimplePlugin extends PluginBase {
217246
this.clientCustomizers = clientCustomizers;
218247
this.factoryCustomizers = factoryCustomizers;
219248
this.workerCustomizers = workerCustomizers;
249+
this.workerInitializers = workerInitializers;
220250
this.workerInterceptors = workerInterceptors;
221251
this.clientInterceptors = clientInterceptors;
222252
this.contextPropagators = contextPropagators;
@@ -292,5 +322,12 @@ public WorkerOptions.Builder configureWorker(
292322
}
293323
return builder;
294324
}
325+
326+
@Override
327+
public void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker) {
328+
for (BiConsumer<String, Worker> initializer : workerInitializers) {
329+
initializer.accept(taskQueue, worker);
330+
}
331+
}
295332
}
296333
}

temporal-sdk/src/main/java/io/temporal/common/plugin/WorkerPlugin.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.common.plugin;
2222

2323
import io.temporal.common.Experimental;
24+
import io.temporal.worker.Worker;
2425
import io.temporal.worker.WorkerFactory;
2526
import io.temporal.worker.WorkerFactoryOptions;
2627
import io.temporal.worker.WorkerOptions;
@@ -103,6 +104,30 @@ default WorkerOptions.Builder configureWorker(
103104
return builder;
104105
}
105106

107+
/**
108+
* Called after a worker is created, allowing plugins to register workflows, activities, Nexus
109+
* services, and other components on the worker.
110+
*
111+
* <p>This method is called in forward (registration) order immediately after the worker is
112+
* created in {@link WorkerFactory#newWorker}.
113+
*
114+
* <p>Example:
115+
*
116+
* <pre>{@code
117+
* @Override
118+
* public void initializeWorker(String taskQueue, Worker worker) {
119+
* worker.registerWorkflowImplementationTypes(MyWorkflow.class);
120+
* worker.registerActivitiesImplementations(new MyActivityImpl());
121+
* }
122+
* }</pre>
123+
*
124+
* @param taskQueue the task queue name for the worker
125+
* @param worker the newly created worker
126+
*/
127+
default void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker) {
128+
// Default: no-op
129+
}
130+
106131
/**
107132
* Allows the plugin to wrap worker factory startup. Called during execution phase in reverse
108133
* order (first plugin wraps all others).

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
169169
workflowThreadExecutor,
170170
workflowClient.getOptions().getContextPropagators());
171171
workers.put(taskQueue, worker);
172+
173+
// Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, activities, etc.)
174+
for (Object plugin : plugins) {
175+
if (plugin instanceof WorkerPlugin) {
176+
((WorkerPlugin) plugin).initializeWorker(taskQueue, worker);
177+
}
178+
}
179+
172180
return worker;
173181
} else {
174182
log.warn(

temporal-sdk/src/test/java/io/temporal/common/plugin/PluginTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ public String getName() {
9797
final boolean[] called = {false};
9898
plugin.runWorkerFactory(null, () -> called[0] = true);
9999
assertTrue("runWorkerFactory should call next", called[0]);
100+
101+
// Test default initializeWorker is a no-op (doesn't throw)
102+
plugin.initializeWorker("test-queue", null);
100103
}
101104

102105
@Test

temporal-sdk/src/test/java/io/temporal/common/plugin/SimplePluginBuilderTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,48 @@ public void testInterceptorsAppendToExisting() {
192192
assertSame(newInterceptor, interceptors[1]);
193193
}
194194

195+
@Test
196+
public void testInitializeWorker() {
197+
AtomicBoolean initialized = new AtomicBoolean(false);
198+
String[] capturedTaskQueue = {null};
199+
200+
PluginBase plugin =
201+
SimplePluginBuilder.newBuilder("test")
202+
.initializeWorker(
203+
(taskQueue, worker) -> {
204+
initialized.set(true);
205+
capturedTaskQueue[0] = taskQueue;
206+
})
207+
.build();
208+
209+
// Call initializeWorker with null worker (we're just testing the callback is invoked)
210+
((WorkerPlugin) plugin).initializeWorker("my-task-queue", null);
211+
212+
assertTrue("Initializer should have been called", initialized.get());
213+
assertEquals("my-task-queue", capturedTaskQueue[0]);
214+
}
215+
216+
@Test
217+
public void testMultipleWorkerInitializers() {
218+
AtomicInteger callCount = new AtomicInteger(0);
219+
220+
PluginBase plugin =
221+
SimplePluginBuilder.newBuilder("test")
222+
.initializeWorker((taskQueue, worker) -> callCount.incrementAndGet())
223+
.initializeWorker((taskQueue, worker) -> callCount.incrementAndGet())
224+
.initializeWorker((taskQueue, worker) -> callCount.incrementAndGet())
225+
.build();
226+
227+
((WorkerPlugin) plugin).initializeWorker("test-queue", null);
228+
229+
assertEquals("All initializers should be called", 3, callCount.get());
230+
}
231+
232+
@Test(expected = NullPointerException.class)
233+
public void testNullInitializeWorker() {
234+
SimplePluginBuilder.newBuilder("test").initializeWorker(null);
235+
}
236+
195237
@Test(expected = NullPointerException.class)
196238
public void testNullName() {
197239
SimplePluginBuilder.newBuilder(null);

0 commit comments

Comments
 (0)