Skip to content

Commit ce0e311

Browse files
committed
chore: refactor relay entrypoint
1 parent 7dc1cc8 commit ce0e311

File tree

7 files changed

+158
-91
lines changed

7 files changed

+158
-91
lines changed

src/@types/base.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ export type Range<F extends number, T extends number> = Exclude<
2626
export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput
2727

2828
export type DatabaseClient = Knex
29+
30+
export interface IRunnable {
31+
run(): void
32+
}

src/adapters/web-server-adapter.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter
1818
}
1919

2020
public listen(port: number): void {
21-
console.log('Listening on port:', port)
2221
this.webServer.listen(port)
2322
}
2423

src/app/app.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import cluster, { Worker } from 'cluster'
2+
import { cpus } from 'os'
3+
import process from 'process'
4+
5+
import { IRunnable } from '../@types/base'
6+
import { ISettings } from '../@types/settings'
7+
import packageJson from '../../package.json'
8+
import { Serializable } from 'child_process'
9+
10+
export class App implements IRunnable {
11+
public constructor(
12+
private readonly settingsFactory: () => ISettings,
13+
) {
14+
15+
cluster
16+
.on('message', this.onClusterMessage.bind(this))
17+
.on('exit', this.onClusterExit.bind(this))
18+
19+
process
20+
.on('SIGTERM', this.onExit.bind(this))
21+
}
22+
23+
public run(): void {
24+
console.log(`${packageJson.name}@${packageJson.version}`)
25+
console.log(`supported NIPs: ${packageJson.supportedNips}`)
26+
console.log(`primary ${process.pid} - running`)
27+
28+
const workerCount = this.settingsFactory().workers?.count || cpus().length
29+
30+
for (let i = 0; i < workerCount; i++) {
31+
cluster.fork()
32+
}
33+
}
34+
35+
private onClusterMessage(source: Worker, message: Serializable) {
36+
for (const worker of Object.values(cluster.workers)) {
37+
if (source.id === worker.id) {
38+
continue
39+
}
40+
41+
worker.send(message)
42+
}
43+
}
44+
45+
private onClusterExit(deadWorker: Worker, code: number, signal: string) {
46+
console.log(`worker ${deadWorker.process.pid} - exiting`)
47+
if (code === 0 || signal === 'SIGINT') {
48+
return
49+
}
50+
const worker = cluster.fork()
51+
52+
const newPID = worker.process.pid
53+
const oldPID = deadWorker.process.pid
54+
55+
console.log('worker ' + oldPID + ' died.')
56+
console.log('worker ' + newPID + ' born.')
57+
}
58+
59+
private onExit() {
60+
console.log('exiting...')
61+
process.exit(0)
62+
}
63+
}

src/app/worker.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import http from 'http'
2+
import process from 'process'
3+
import { WebSocketServer } from 'ws'
4+
5+
import { createSettings } from '../factories/settings-factory'
6+
import { IEventRepository } from '../@types/repositories'
7+
import { IRunnable } from '../@types/base'
8+
import { webSocketAdapterFactory } from '../factories/websocket-adapter-factory'
9+
import { WebSocketServerAdapter } from '../adapters/web-socket-server-adapter'
10+
11+
export class AppWorker implements IRunnable {
12+
private webServer: http.Server
13+
private wss: WebSocketServer
14+
private adapter: WebSocketServerAdapter
15+
16+
public constructor(
17+
private readonly eventRepository: IEventRepository
18+
) {
19+
this.webServer = http.createServer()
20+
this.wss = new WebSocketServer({ server: this.webServer, maxPayload: 1024 * 1024 })
21+
this.adapter = new WebSocketServerAdapter(
22+
this.webServer,
23+
this.wss,
24+
webSocketAdapterFactory(this.eventRepository),
25+
createSettings,
26+
)
27+
28+
process
29+
.on('message', (message: { eventName: string, event: unknown }) => {
30+
this.adapter.emit(message.eventName, message.event)
31+
})
32+
.on('SIGINT', this.onExit.bind(this))
33+
.on('SIGHUP', this.onExit.bind(this))
34+
.on('SIGTERM', this.onExit.bind(this))
35+
.on('uncaughtException', this.onError.bind(this))
36+
.on('unhandledRejection', this.onError.bind(this))
37+
}
38+
39+
public run(): void {
40+
const port = Number(process.env.SERVER_PORT) || 8008
41+
42+
this.adapter.listen(port)
43+
44+
console.log(`worker ${process.pid} - listening on port ${port}`)
45+
}
46+
47+
private onError(error: Error) {
48+
console.error(`worker ${process.pid} - error`, error)
49+
throw error
50+
}
51+
52+
private onExit() {
53+
console.log(`worker ${process.pid} - exiting`)
54+
this.wss.close(() => {
55+
this.webServer.close(() => {
56+
// dbClient.destroy(() => {
57+
// process.exit(0)
58+
// })
59+
process.exit(0)
60+
})
61+
})
62+
}
63+
}

src/factories/app-factory.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { App } from '../app/app'
2+
import { SettingsStatic } from '../utils/settings'
3+
4+
export const appFactory = () => {
5+
return new App(SettingsStatic.createSettings)
6+
}

src/factories/worker-factory.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { AppWorker } from '../app/worker'
2+
import { EventRepository } from '../repositories/event-repository'
3+
import { getDbClient } from '../database/client'
4+
5+
export const workerFactory = () => {
6+
const dbClient = getDbClient()
7+
const eventRepository = new EventRepository(dbClient)
8+
9+
return new AppWorker(
10+
eventRepository
11+
)
12+
}

src/index.ts

Lines changed: 10 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,15 @@
1-
import cluster, { Worker } from 'cluster'
2-
import { cpus } from 'os'
3-
import http from 'http'
4-
import process from 'process'
5-
import { WebSocketServer } from 'ws'
1+
import cluster from 'cluster'
62

7-
import { createSettings } from './factories/settings-factory'
8-
import { EventRepository } from './repositories/event-repository'
9-
import { getDbClient } from './database/client'
10-
import packageJson from '../package.json'
11-
import { webSocketAdapterFactory } from './factories/websocket-adapter-factory'
12-
import { WebSocketServerAdapter } from './adapters/web-socket-server-adapter'
3+
import { appFactory } from './factories/app-factory'
4+
import { workerFactory } from './factories/worker-factory'
135

14-
const newWorker = (): Worker => {
15-
return cluster.fork()
6+
export const run = (isPrimary: boolean) => {
7+
return (isPrimary)
8+
? appFactory
9+
: workerFactory
1610
}
1711

18-
if (cluster.isPrimary) {
19-
console.log(`${packageJson.name}@${packageJson.version}`)
20-
console.log(`supported NIPs: ${packageJson.supportedNips}`)
21-
console.log(`primary ${process.pid} - running`)
22-
23-
const numCpus = cpus().length
24-
25-
for (let i = 0; i < numCpus; i++) {
26-
newWorker()
27-
}
28-
29-
cluster.on('exit', (deadWorker, code, signal) => {
30-
console.log(`worker ${deadWorker.process.pid} - exiting`)
31-
if (code === 0 || signal === 'SIGINT') {
32-
return
33-
}
34-
const worker = newWorker()
35-
36-
const newPID = worker.process.pid
37-
const oldPID = deadWorker.process.pid
38-
39-
console.log('worker ' + oldPID + ' died.')
40-
console.log('worker ' + newPID + ' born.')
41-
})
42-
43-
cluster.on('message', (source, message) => {
44-
for (const worker of Object.values(cluster.workers)) {
45-
if (source.id === worker.id) {
46-
continue
47-
}
48-
49-
worker.send(message)
50-
}
51-
})
52-
53-
process.on('SIGTERM', () => {
54-
console.log('exiting...')
55-
process.exit(0)
56-
})
57-
} else if (cluster.isWorker) {
58-
const port = Number(process.env.SERVER_PORT) || 8008
59-
60-
const dbClient = getDbClient()
61-
const eventRepository = new EventRepository(dbClient)
62-
63-
const server = http.createServer()
64-
const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 })
65-
const adapter = new WebSocketServerAdapter(
66-
server,
67-
wss,
68-
webSocketAdapterFactory(eventRepository),
69-
createSettings,
70-
)
71-
72-
adapter.listen(port)
73-
74-
const exitHandler = () => {
75-
console.log(`worker ${process.pid} - exiting`)
76-
wss.close(() => {
77-
server.close(() => {
78-
dbClient.destroy(() => {
79-
process.exit(0)
80-
})
81-
})
82-
})
83-
}
84-
85-
process.on('SIGINT', exitHandler)
86-
process.on('SIGHUP', exitHandler)
87-
process.on('SIGTERM', exitHandler)
88-
process.on('uncaughtException', exitHandler)
89-
90-
process.on('message', (message: { eventName: string, event: unknown }) => {
91-
adapter.emit(message.eventName, message.event)
92-
})
93-
94-
console.log(`worker ${process.pid} - listening on port ${port}`)
12+
if (require.main === module) {
13+
run(cluster.isPrimary)()
9514
}
15+

0 commit comments

Comments
 (0)