Skip to content

Commit 8637697

Browse files
committed
chore: refactor worker & app factory
1 parent 516c186 commit 8637697

File tree

8 files changed

+58
-66
lines changed

8 files changed

+58
-66
lines changed

src/@types/adapters.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { EventEmitter } from 'node:stream'
22
import { SubscriptionFilter } from './subscription'
33

4-
export interface IWebSocketServerAdapter extends EventEmitter {
4+
export interface IWebSocketServerAdapter extends EventEmitter, IWebServerAdapter {
55
getConnectedClients(): number
6-
terminate(): Promise<void>
6+
close(callback: () => void): void
77
}
88

99
export interface IWebServerAdapter extends EventEmitter {
10-
listen(port: number)
10+
listen(port: number): void
1111
}
1212

1313

src/adapters/web-server-adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { IWebServerAdapter } from '../@types/adapters'
88
export class WebServerAdapter extends EventEmitter implements IWebServerAdapter {
99

1010
public constructor(
11-
private readonly webServer: Server,
11+
protected readonly webServer: Server,
1212
private readonly settings: () => ISettings,
1313
) {
1414
super()

src/adapters/web-socket-server-adapter.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,10 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock
4343
this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
4444
}
4545

46-
public async terminate(): Promise<void> {
47-
return void Promise.all(
48-
[
49-
...Array.from(this.webSocketServer.clients).map((webSocket: WebSocket) =>
50-
webSocket.terminate()
51-
),
52-
],
53-
)
46+
public close(callback: () => void): void {
47+
this.webSocketServer.close(() => {
48+
this.webServer.close(callback)
49+
})
5450
}
5551

5652
private onBroadcast(event: Event) {

src/app/app.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import cluster, { Worker } from 'cluster'
1+
import { Cluster, Worker } from 'cluster'
22
import { cpus } from 'os'
3-
import process from 'process'
43

54
import { IRunnable } from '../@types/base'
65
import { ISettings } from '../@types/settings'
@@ -9,31 +8,32 @@ import { Serializable } from 'child_process'
98

109
export class App implements IRunnable {
1110
public constructor(
11+
private readonly process: NodeJS.Process,
12+
private readonly cluster: Cluster,
1213
private readonly settingsFactory: () => ISettings,
1314
) {
14-
15-
cluster
15+
this.cluster
1616
.on('message', this.onClusterMessage.bind(this))
1717
.on('exit', this.onClusterExit.bind(this))
1818

19-
process
19+
this.process
2020
.on('SIGTERM', this.onExit.bind(this))
2121
}
2222

2323
public run(): void {
2424
console.log(`${packageJson.name}@${packageJson.version}`)
2525
console.log(`supported NIPs: ${packageJson.supportedNips}`)
26-
console.log(`primary ${process.pid} - running`)
26+
console.log(`primary ${this.process.pid} - running`)
2727

2828
const workerCount = this.settingsFactory().workers?.count || cpus().length
2929

3030
for (let i = 0; i < workerCount; i++) {
31-
cluster.fork()
31+
this.cluster.fork()
3232
}
3333
}
3434

3535
private onClusterMessage(source: Worker, message: Serializable) {
36-
for (const worker of Object.values(cluster.workers)) {
36+
for (const worker of Object.values(this.cluster.workers)) {
3737
if (source.id === worker.id) {
3838
continue
3939
}
@@ -47,17 +47,12 @@ export class App implements IRunnable {
4747
if (code === 0 || signal === 'SIGINT') {
4848
return
4949
}
50-
const worker = cluster.fork()
51-
52-
const newPID = worker.process.pid
53-
const oldPID = deadWorker.process.pid
5450

55-
console.log('worker ' + oldPID + ' died.')
56-
console.log('worker ' + newPID + ' born.')
51+
this.cluster.fork()
5752
}
5853

5954
private onExit() {
6055
console.log('exiting...')
61-
process.exit(0)
56+
this.process.exit(0)
6257
}
6358
}

src/app/worker.ts

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,13 @@
1-
import http from 'http'
2-
import process from 'process'
3-
import { WebSocketServer } from 'ws'
4-
5-
import { createSettings } from '../factories/settings-factory'
6-
import { IEventRepository } from '../@types/repositories'
71
import { IRunnable } from '../@types/base'
8-
import { webSocketAdapterFactory } from '../factories/websocket-adapter-factory'
9-
import { WebSocketServerAdapter } from '../adapters/web-socket-server-adapter'
2+
import { IWebSocketServerAdapter } from '../@types/adapters'
103

114
export class AppWorker implements IRunnable {
12-
private webServer: http.Server
13-
private wss: WebSocketServer
14-
private adapter: WebSocketServerAdapter
15-
165
public constructor(
17-
private readonly eventRepository: IEventRepository
6+
private readonly process: NodeJS.Process,
7+
private readonly adapter: IWebSocketServerAdapter
188
) {
19-
this.webServer = http.createServer()
20-
this.wss = new WebSocketServer({ server: this.webServer, maxPayload: 1024 * 1024 })
21-
this.adapter = new WebSocketServerAdapter(
22-
this.webServer,
23-
this.wss,
24-
webSocketAdapterFactory(this.eventRepository),
25-
createSettings,
26-
)
27-
289
process
29-
.on('message', (message: { eventName: string, event: unknown }) => {
30-
this.adapter.emit(message.eventName, message.event)
31-
})
10+
.on('message', this.onMessage.bind(this))
3211
.on('SIGINT', this.onExit.bind(this))
3312
.on('SIGHUP', this.onExit.bind(this))
3413
.on('SIGTERM', this.onExit.bind(this))
@@ -44,20 +23,22 @@ export class AppWorker implements IRunnable {
4423
console.log(`worker ${process.pid} - listening on port ${port}`)
4524
}
4625

26+
private onMessage(message: { eventName: string, event: unknown }): void {
27+
this.adapter.emit(message.eventName, message.event)
28+
}
29+
4730
private onError(error: Error) {
4831
console.error(`worker ${process.pid} - error`, error)
4932
throw error
5033
}
5134

5235
private onExit() {
5336
console.log(`worker ${process.pid} - exiting`)
54-
this.wss.close(() => {
55-
this.webServer.close(() => {
56-
// dbClient.destroy(() => {
57-
// process.exit(0)
58-
// })
59-
process.exit(0)
60-
})
37+
this.adapter.close(() => {
38+
// dbClient.destroy(() => {
39+
// process.exit(0)
40+
// })
41+
process.exit(0)
6142
})
6243
}
6344
}

src/factories/app-factory.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import cluster from 'cluster'
2+
import process from 'process'
3+
14
import { App } from '../app/app'
25
import { SettingsStatic } from '../utils/settings'
36

47
export const appFactory = () => {
5-
return new App(SettingsStatic.createSettings)
8+
return new App(process, cluster, SettingsStatic.createSettings)
69
}

src/factories/worker-factory.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
1+
import http from 'http'
2+
import process from 'process'
3+
import { WebSocketServer } from 'ws'
4+
15
import { AppWorker } from '../app/worker'
6+
import { createSettings } from '../factories/settings-factory'
27
import { EventRepository } from '../repositories/event-repository'
38
import { getDbClient } from '../database/client'
9+
import { webSocketAdapterFactory } from './websocket-adapter-factory'
10+
import { WebSocketServerAdapter } from '../adapters/web-socket-server-adapter'
411

512
export const workerFactory = () => {
613
const dbClient = getDbClient()
714
const eventRepository = new EventRepository(dbClient)
815

9-
return new AppWorker(
10-
eventRepository
16+
const server = http.createServer()
17+
const webSocketServer = new WebSocketServer({
18+
server,
19+
maxPayload: 1024 * 1024,
20+
})
21+
const adapter = new WebSocketServerAdapter(
22+
server,
23+
webSocketServer,
24+
webSocketAdapterFactory(eventRepository),
25+
createSettings,
1126
)
27+
28+
return new AppWorker(process, adapter)
1229
}

src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ import cluster from 'cluster'
33
import { appFactory } from './factories/app-factory'
44
import { workerFactory } from './factories/worker-factory'
55

6-
export const run = (isPrimary: boolean) => {
6+
export const getRunner = (isPrimary: boolean) => {
77
return (isPrimary)
8-
? appFactory
9-
: workerFactory
8+
? appFactory()
9+
: workerFactory()
1010
}
1111

1212
if (require.main === module) {
13-
run(cluster.isPrimary)()
13+
getRunner(cluster.isPrimary).run()
1414
}

0 commit comments

Comments
 (0)