Skip to content

Commit abe6d39

Browse files
committed
refactor(temporal): update worker management to support multiple workers
- Changed the worker property to an array to handle multiple Temporal workers. - Updated shutdown and bootstrap methods to manage multiple workers. - Adjusted worker configuration to accept an array of worker options.
1 parent 0d006b2 commit abe6d39

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

packages/nestjs-temporal/src/temporal.explorer.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ export class TemporalExplorer
4747
@Inject(TEMPORAL_MODULE_OPTIONS_TOKEN)
4848
private readonly options!: TemporalModuleOptions;
4949
private readonly logger = new Logger(TemporalExplorer.name);
50-
private worker?: Worker;
51-
private workerRunPromise?: Promise<void>;
50+
private workers?: Worker[];
51+
private workerRunPromises?: Promise<void>[];
5252

5353
constructor(
5454
private readonly discoveryService: DiscoveryService,
@@ -68,14 +68,14 @@ export class TemporalExplorer
6868
* Shuts down the Temporal worker when the module is destroyed.
6969
*/
7070
async onModuleDestroy(): Promise<void> {
71-
if (!this.worker) {
71+
if (!this.workers) {
7272
return;
7373
}
7474

7575
try {
76-
this.worker.shutdown();
77-
if (this.workerRunPromise) {
78-
await this.worker.run();
76+
this.workers.forEach((worker) => worker.shutdown());
77+
if (this.workerRunPromises) {
78+
await Promise.all(this.workerRunPromises);
7979
}
8080
} catch (err: unknown) {
8181
this.logger.warn("Temporal workers were not cleanly shutdown.", {
@@ -88,8 +88,8 @@ export class TemporalExplorer
8888
* Starts the Temporal worker when the application is fully bootstrapped.
8989
*/
9090
onApplicationBootstrap(): void {
91-
if (this.worker) {
92-
this.workerRunPromise = this.worker.run();
91+
if (this.workers) {
92+
this.workerRunPromises = this.workers.map((worker) => worker.run());
9393
}
9494
}
9595

@@ -103,7 +103,7 @@ export class TemporalExplorer
103103
const connectionOptions = this.getNativeConnectionOptions();
104104

105105
// Worker must have a taskQueue configured
106-
if (!workerConfig.taskQueue) {
106+
if (!workerConfig.some((config) => !config.taskQueue)) {
107107
this.logger.warn(
108108
"Temporal worker configuration missing taskQueue. Worker will not be created.",
109109
);
@@ -130,16 +130,15 @@ export class TemporalExplorer
130130
}
131131

132132
this.logger.verbose("Creating a new Worker");
133-
this.worker = await Worker.create({
134-
...workerConfig,
135-
...workerOptions,
136-
});
133+
this.workers = await Promise.all(
134+
workerConfig.map((config) => Worker.create(config)),
135+
);
137136
}
138137

139138
/**
140139
* Gets the worker configuration options.
141140
*/
142-
getWorkerConfigOptions(): WorkerOptions {
141+
getWorkerConfigOptions(): WorkerOptions[] {
143142
return this.options.workerOptions;
144143
}
145144

packages/nestjs-temporal/src/temporal.module-definition.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
import { ConfigurableModuleBuilder } from "@nestjs/common";
77

88
export interface TemporalModuleOptions {
9-
workerOptions: WorkerOptions;
9+
workerOptions: WorkerOptions[];
1010
connectionOptions?: NativeConnectionOptions;
1111
runtimeOptions?: RuntimeOptions;
1212
activityClasses?: object[];

0 commit comments

Comments
 (0)