Skip to content

Commit d356504

Browse files
committed
feat(decorator): add OnWorkerEvent
1 parent 682140d commit d356504

15 files changed

+404
-47
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ Why you should prefer Graphile Worker instead of [Bull](https://github.com/nestj
1010

1111
1. You already have a PostgreSQL in your stack (and you don't want to add a Redis server)
1212

13+
## Features
14+
15+
- use a `GraphileWorkerModule` to register Graphile Worker with a `asRootAsync` to pass dynamic parameters
16+
- provide a `GraphileWorkerService` to add jobs or start runner
17+
- provide a `OnWorkerEvenet` decorator to add custom behavior on `job:success` for example
18+
1319
## Installation
1420

1521
```bash
@@ -121,6 +127,30 @@ async function bootstrap() {
121127
bootstrap();
122128
```
123129

130+
## `OnWorkerEvent` decorator
131+
132+
You need to add `@GraphileWorkerListener` decorator on your class and then set `@OnWorkerEvent(eventName)` on method:
133+
134+
```ts
135+
import { GraphileWorkerListener, OnWorkerEvent } from '@app/graphile-worker';
136+
import { Injectable, Logger } from '@nestjs/common';
137+
import { WorkerEventMap } from 'graphile-worker';
138+
139+
@Injectable()
140+
@GraphileWorkerListener()
141+
export class AppService {
142+
private readonly logger = new Logger(AppService.name);
143+
144+
@OnWorkerEvent('job:success')
145+
onJobSuccess({ job }: WorkerEventMap['job:success']) {
146+
this.logger.debug(`job #${job.id} finished`);
147+
// output: [Nest] 1732 - 09/14/2021, 12:42:45 PM DEBUG [AppService] job #349 finished
148+
}
149+
}
150+
```
151+
152+
You can find a complete list of available event on [Graphile Worker's documentation](https://github.com/graphile/worker#workerevents).
153+
124154
## Test
125155

126156
```bash

libs/graphile-worker/src/config.interface.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { FactoryProvider, ModuleMetadata } from '@nestjs/common';
22
import { RunnerOptions } from 'graphile-worker';
33

4-
export type GraphileWorkerConfiguration = RunnerOptions;
4+
export type RunnerOptionWithoutEvents = Omit<RunnerOptions, 'events'>;
55

66
export interface GraphileWorkerConfigurationFactory {
77
createSharedConfiguration():
8-
| Promise<GraphileWorkerConfiguration>
9-
| GraphileWorkerConfiguration;
8+
| Promise<RunnerOptionWithoutEvents>
9+
| RunnerOptionWithoutEvents;
1010
}
1111

1212
export interface GraphileWorkerAsyncConfiguration
@@ -26,7 +26,7 @@ export interface GraphileWorkerAsyncConfiguration
2626
*/
2727
useFactory?: (
2828
...args: any[]
29-
) => Promise<GraphileWorkerConfiguration> | GraphileWorkerConfiguration;
29+
) => Promise<RunnerOptionWithoutEvents> | RunnerOptionWithoutEvents;
3030

3131
/**
3232
* Optional list of providers to be injected into the context of the Factory function.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { EventEmitter } from 'events';
3+
import { RunnerOptions } from 'graphile-worker';
4+
5+
@Injectable()
6+
export class ConfigurationService {
7+
constructor(public readonly config: RunnerOptions) {
8+
const events = new EventEmitter();
9+
10+
this.config.events = events;
11+
}
12+
}
13+
14+
// TODO: try to remove this ?
15+
export const CONFIGURATION_SERVICE_KEY = Symbol.for(ConfigurationService.name);

libs/graphile-worker/src/graphile-worker.module.ts

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,28 @@
11
import { DynamicModule, Module, Provider } from '@nestjs/common';
2+
import { DiscoveryModule } from '@nestjs/core';
23
import {
34
GraphileWorkerAsyncConfiguration,
4-
GraphileWorkerConfiguration,
55
GraphileWorkerConfigurationFactory,
6+
RunnerOptionWithoutEvents,
67
} from './config.interface';
8+
import {
9+
ConfigurationService,
10+
CONFIGURATION_SERVICE_KEY,
11+
} from './configuration.service';
712
import { GraphileWorkerService } from './graphile-worker.service';
13+
import { ListenerExplorerService } from './listener-explorer.service';
14+
import { MetadataAccessorService } from './metadata-accessor.service';
815

916
export const GRAPHILE_WORKER_TOKEN = Symbol.for('NestJsGraphileWorker');
1017

18+
const internalsProviders = [
19+
MetadataAccessorService,
20+
ListenerExplorerService,
21+
GraphileWorkerService,
22+
];
23+
24+
const internalsModules = [DiscoveryModule];
25+
1126
@Module({
1227
providers: [GraphileWorkerService],
1328
exports: [GraphileWorkerService],
@@ -27,17 +42,20 @@ export class GraphileWorkerModule {
2742
* }),
2843
* ```
2944
*/
30-
static forRoot(config: GraphileWorkerConfiguration): DynamicModule {
31-
const graphileWorkerService: Provider = {
32-
provide: GraphileWorkerService,
33-
useValue: new GraphileWorkerService(config),
45+
static forRoot(config: RunnerOptionWithoutEvents): DynamicModule {
46+
const configurationService = new ConfigurationService(config);
47+
48+
const graphileConfigurationServiceProvider: Provider = {
49+
provide: CONFIGURATION_SERVICE_KEY,
50+
useValue: configurationService,
3451
};
3552

3653
return {
3754
global: true,
55+
imports: internalsModules,
3856
module: GraphileWorkerModule,
39-
providers: [graphileWorkerService],
40-
exports: [graphileWorkerService],
57+
providers: [graphileConfigurationServiceProvider, ...internalsProviders],
58+
exports: [GraphileWorkerService],
4159
};
4260
}
4361

@@ -67,8 +85,8 @@ export class GraphileWorkerModule {
6785
return {
6886
global: true,
6987
module: GraphileWorkerModule,
70-
imports: asyncConfig.imports,
71-
providers,
88+
imports: [...asyncConfig.imports, ...internalsModules],
89+
providers: [...providers, ...internalsProviders],
7290
exports: providers,
7391
};
7492
}
@@ -84,23 +102,22 @@ export class GraphileWorkerModule {
84102
): Provider {
85103
if (options.useFactory) {
86104
return {
87-
provide: GraphileWorkerService,
105+
provide: CONFIGURATION_SERVICE_KEY,
88106
useFactory: async (...args: any[]) => {
89-
const configuration = await options.useFactory(...args);
90-
return new GraphileWorkerService(configuration);
107+
const config = await options.useFactory(...args);
108+
return new ConfigurationService(config);
91109
},
92110
inject: options.inject || [],
93111
};
94112
}
95113

96114
return {
97-
provide: GraphileWorkerService,
115+
provide: CONFIGURATION_SERVICE_KEY,
98116
useFactory: async (
99117
optionsFactory: GraphileWorkerConfigurationFactory,
100118
) => {
101-
const configuration = await optionsFactory.createSharedConfiguration();
102-
103-
return new GraphileWorkerService(configuration);
119+
const config = await optionsFactory.createSharedConfiguration();
120+
return new ConfigurationService(config);
104121
},
105122
inject: options.inject,
106123
};

libs/graphile-worker/src/graphile-worker.service.spec.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,35 @@
11
import { Test, TestingModule } from '@nestjs/testing';
2+
import {
3+
ConfigurationService,
4+
CONFIGURATION_SERVICE_KEY,
5+
} from './configuration.service';
26
import { GraphileWorkerService } from './graphile-worker.service';
7+
import { ListenerExplorerService } from './listener-explorer.service';
38

4-
describe('GraphileWorkerService', () => {
9+
describe(GraphileWorkerService.name, () => {
510
let service: GraphileWorkerService;
611

712
beforeEach(async () => {
813
const module: TestingModule = await Test.createTestingModule({
9-
providers: [GraphileWorkerService],
14+
providers: [
15+
GraphileWorkerService,
16+
{
17+
provide: ListenerExplorerService,
18+
useValue: {
19+
listeners: [],
20+
},
21+
},
22+
{
23+
useFactory: () => {
24+
const configurationService = new ConfigurationService({
25+
taskList: { hello: () => {} },
26+
connectionString: 'postgres://example:password@postgres/example',
27+
});
28+
return configurationService;
29+
},
30+
provide: CONFIGURATION_SERVICE_KEY,
31+
},
32+
],
1033
}).compile();
1134

1235
service = module.get<GraphileWorkerService>(GraphileWorkerService);

libs/graphile-worker/src/graphile-worker.service.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Injectable, Logger } from '@nestjs/common';
1+
import { Inject, Injectable, Logger } from '@nestjs/common';
22
import {
33
Job,
44
makeWorkerUtils,
@@ -8,13 +8,31 @@ import {
88
TaskSpec,
99
WorkerUtils,
1010
} from 'graphile-worker';
11+
import {
12+
ConfigurationService,
13+
CONFIGURATION_SERVICE_KEY,
14+
} from './configuration.service';
15+
import { ListenerExplorerService } from './listener-explorer.service';
1116

1217
@Injectable()
1318
export class GraphileWorkerService {
1419
private readonly logger = new Logger(GraphileWorkerService.name);
1520
private isMigrationDone: boolean;
21+
private readonly options: RunnerOptions;
22+
23+
constructor(
24+
@Inject(CONFIGURATION_SERVICE_KEY)
25+
configuration: ConfigurationService,
26+
private readonly explorerService: ListenerExplorerService,
27+
) {
28+
configuration.config.events.on('job:success', (...args: any[]) => {
29+
this.explorerService.listeners
30+
.filter(({ event }) => event === 'job:success')
31+
.forEach(({ callback }) => callback(...args));
32+
});
1633

17-
constructor(private readonly options: RunnerOptions) {}
34+
this.options = configuration.config;
35+
}
1836

1937
/**
2038
* Runs until either stopped by a signal event like `SIGINT` or by calling the `stop()` method on the resolved object.

libs/graphile-worker/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './graphile-worker.module';
22
export * from './graphile-worker.service';
3+
export * from './worker-hooks.decorators';
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { DiscoveryModule } from '@nestjs/core';
3+
import { Test, TestingModule } from '@nestjs/testing';
4+
import { GraphileWorkerListener, OnWorkerEvent } from '.';
5+
import { ListenerExplorerService } from './listener-explorer.service';
6+
import { MetadataAccessorService } from './metadata-accessor.service';
7+
8+
@Injectable()
9+
@GraphileWorkerListener()
10+
class TestListenerService {
11+
@OnWorkerEvent('job:success')
12+
onJobSuccess() {}
13+
14+
@OnWorkerEvent('job:error')
15+
onJobError() {}
16+
}
17+
18+
describe(ListenerExplorerService.name, () => {
19+
let service: ListenerExplorerService;
20+
21+
beforeEach(async () => {
22+
const module: TestingModule = await Test.createTestingModule({
23+
imports: [DiscoveryModule],
24+
providers: [
25+
ListenerExplorerService,
26+
MetadataAccessorService,
27+
TestListenerService,
28+
],
29+
}).compile();
30+
31+
service = module.get<ListenerExplorerService>(ListenerExplorerService);
32+
});
33+
34+
it('should be defined', () => {
35+
expect(service).toBeDefined();
36+
});
37+
38+
describe('explore', () => {
39+
it('should register TestListenerService', () => {
40+
service.explore();
41+
expect(service.listeners).toHaveLength(2);
42+
43+
const eventsRegistered = service.listeners.map((l) => l.event);
44+
45+
expect(eventsRegistered).toContain('job:success');
46+
expect(eventsRegistered).toContain('job:error');
47+
});
48+
});
49+
});
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
2+
import { DiscoveryService } from '@nestjs/core';
3+
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
4+
import { MetadataScanner } from '@nestjs/core/metadata-scanner';
5+
import { MetadataAccessorService } from './metadata-accessor.service';
6+
import { WorkerEventName } from './worker-hooks.decorators';
7+
8+
/**
9+
* This service is responsible to scan all [Graphile worker decorators](./worker-hooks.decorators.ts) and register them.
10+
*
11+
* Heavily inspired from [`BullExplorer`](https://github.com/nestjs/bull/blob/c230eab1dc26fb743a3428e61043167866b1e377/lib/bull.explorer.ts)
12+
*/
13+
@Injectable()
14+
export class ListenerExplorerService implements OnModuleInit {
15+
private readonly logger = new Logger(ListenerExplorerService.name);
16+
17+
public readonly listeners: { event: WorkerEventName; callback: Function }[] =
18+
[];
19+
20+
constructor(
21+
private readonly discoveryService: DiscoveryService,
22+
private readonly metadataAccessor: MetadataAccessorService,
23+
private readonly metadataScanner: MetadataScanner,
24+
) {}
25+
26+
onModuleInit() {
27+
this.explore();
28+
}
29+
30+
explore() {
31+
const providers: InstanceWrapper[] = this.discoveryService
32+
.getProviders()
33+
.filter((wrapper: InstanceWrapper) =>
34+
this.metadataAccessor.isListener(wrapper.metatype),
35+
);
36+
37+
providers.forEach((wrapper: InstanceWrapper) => {
38+
const { instance } = wrapper;
39+
40+
this.metadataScanner.scanFromPrototype(
41+
instance,
42+
Object.getPrototypeOf(instance),
43+
(key: string) => {
44+
if (this.metadataAccessor.isWorkerEvent(instance[key])) {
45+
const event = this.metadataAccessor.getListenerMetadata(
46+
instance[key],
47+
);
48+
49+
this.listeners.push({
50+
event,
51+
callback: (...args) => instance[key](...args),
52+
});
53+
54+
this.logger.debug(
55+
`Register ${event} from ${
56+
(instance as Object).constructor.name
57+
}.${key}`,
58+
);
59+
}
60+
},
61+
);
62+
});
63+
}
64+
}

0 commit comments

Comments
 (0)