Skip to content

Commit cd4c60a

Browse files
committed
feat: implement static mirroring
Signed-off-by: Ricardo Arturo Cabral Mejía <[email protected]>
1 parent 282f7db commit cd4c60a

File tree

11 files changed

+244
-19
lines changed

11 files changed

+244
-19
lines changed

CONFIGURATION.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ Running `nostream` for the first time creates the settings file in `<project_roo
4747
| info.description | Public description of your relay. (e.g. Toronto Bitcoin Group Public Relay) |
4848
| info.pubkey | Relay operator's Nostr pubkey in hex format. |
4949
| info.contact | Relay operator's contact. (e.g. mailto:[email protected]) |
50-
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
51-
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
50+
| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame |
51+
| network.remoteIpHeader | HTTP header from proxy containing IP address from client. |
52+
| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) |
53+
| mirroring.static[].filters | Subscription filters used to mirror. |
54+
| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. |
5255
| workers.count | Number of workers to spin up to handle incoming connections. |
5356
| | Spin workers as many CPUs are available when set to zero. Defaults to zero. |
5457
| limits.event.eventId.minLeadingZeroBits | Leading zero bits required on every incoming event for proof of work. |

resources/default-settings.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ network:
3535
idleTimeout: 60
3636
workers:
3737
count: 0
38+
mirroring:
39+
static: []
3840
limits:
3941
invoice:
4042
rateLimits:

src/@types/base.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ export type TagName = string
77
export type Signature = string
88
export type Tag = TagBase & string[]
99

10+
export type Secret = string
11+
1012
export interface TagBase {
1113
0: TagName
1214
[index: number]: string

src/@types/event.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
import { ContextMetadata, EventId, Pubkey, Tag } from './base'
22
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventKinds } from '../constants/base'
33

4-
export interface Event {
4+
export interface BaseEvent {
55
id: EventId
66
pubkey: Pubkey
77
created_at: number
88
kind: EventKinds
99
tags: Tag[]
1010
sig: string
1111
content: string
12+
}
13+
14+
export interface Event extends BaseEvent {
1215
[ContextMetadataKey]?: ContextMetadata
1316
}
1417

18+
export type RelayedEvent = Event
19+
1520
export type UnsignedEvent = Omit<Event, 'sig'>
1621

1722
export type UnidentifiedEvent = Omit<UnsignedEvent, 'id'>

src/@types/messages.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { ContextMetadata, EventId, Range } from './base'
1+
import { ContextMetadata, EventId, Range, Secret } from './base'
2+
import { Event, RelayedEvent } from './event'
23
import { SubscriptionFilter, SubscriptionId } from './subscription'
34
import { ContextMetadataKey } from '../constants/base'
4-
import { Event } from './event'
55

66
export enum MessageType {
77
REQ = 'REQ',
@@ -36,9 +36,12 @@ export type SubscribeMessage = {
3636

3737
export type IncomingEventMessage = EventMessage & [MessageType.EVENT, Event]
3838

39+
export type IncomingRelayedEventMessage = [MessageType.EVENT, RelayedEvent, Secret]
40+
3941
export interface EventMessage {
4042
0: MessageType.EVENT
4143
1: Event
44+
2?: Secret
4245
}
4346

4447
export interface OutgoingEventMessage {

src/@types/settings.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import { Pubkey, Secret } from './base'
12
import { EventKinds } from '../constants/base'
23
import { MessageType } from './messages'
3-
import { Pubkey } from './base'
4+
import { SubscriptionFilter } from './subscription'
45

56
export interface Info {
67
relay_url: string
@@ -151,11 +152,30 @@ export interface PaymentsProcessors {
151152
zebedee?: ZebedeePaymentsProcessor
152153
}
153154

155+
export interface Local {
156+
secret: Secret
157+
}
158+
159+
export interface Remote {
160+
secret: Secret
161+
}
162+
163+
export interface Mirror {
164+
address: string
165+
filters?: SubscriptionFilter[]
166+
secret?: Secret
167+
}
168+
169+
export interface Mirroring {
170+
static?: Mirror[]
171+
}
172+
154173
export interface Settings {
155174
info: Info
156175
payments?: Payments
157176
paymentsProcessors?: PaymentsProcessors
158177
network: Network
159178
workers?: Worker
160179
limits?: Limits
180+
mirroring?: Mirroring
161181
}

src/app/app.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { SettingsStatic } from '../utils/settings'
1414
const debug = createLogger('app-primary')
1515

1616
export class App implements IRunnable {
17-
private workers: WeakMap<Worker, string>
17+
private workers: WeakMap<Worker, Record<string, string>>
1818
private watchers: FSWatcher[] | undefined
1919

2020
public constructor(
@@ -66,18 +66,32 @@ export class App implements IRunnable {
6666
? Number(process.env.WORKER_COUNT)
6767
: this.settings().workers?.count || cpus().length
6868

69+
const createWorker = (env: Record<string, string>) => {
70+
const worker = this.cluster.fork(env)
71+
this.workers.set(worker, env)
72+
}
73+
6974
for (let i = 0; i < workerCount; i++) {
7075
debug('starting worker')
71-
const worker = this.cluster.fork({
76+
createWorker({
7277
WORKER_TYPE: 'worker',
7378
})
74-
this.workers.set(worker, 'worker')
7579
}
7680

77-
const worker = this.cluster.fork({
81+
createWorker({
7882
WORKER_TYPE: 'maintenance',
7983
})
80-
this.workers.set(worker, 'maintenance')
84+
85+
const mirrors = settings?.mirroring?.static
86+
87+
if (Array.isArray(mirrors) && mirrors.length) {
88+
for (let i = 0; i < mirrors.length; i++) {
89+
createWorker({
90+
WORKER_TYPE: 'static-mirroring',
91+
MIRROR_INDEX: i.toString(),
92+
})
93+
}
94+
}
8195

8296
logCentered(`${workerCount} workers started`, width)
8397

@@ -111,14 +125,13 @@ export class App implements IRunnable {
111125
}
112126
setTimeout(() => {
113127
debug('starting worker')
114-
const workerType = this.workers.get(deadWorker)
115-
if (!workerType) {
128+
const workerEnv = this.workers.get(deadWorker)
129+
if (!workerEnv) {
116130
throw new Error('Mistakes were made')
117131
}
118-
const newWorker = this.cluster.fork({
119-
WORKER_TYPE: workerType,
120-
})
121-
this.workers.set(newWorker, workerType)
132+
const newWorker = this.cluster.fork(workerEnv)
133+
this.workers.set(newWorker, workerEnv)
134+
122135
debug('started worker %s', newWorker.process.pid)
123136
}, 10000)
124137
}

src/app/static-mirroring-worker.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import { anyPass, map, path } from 'ramda'
2+
import { RawData, WebSocket } from 'ws'
3+
import cluster from 'cluster'
4+
import { randomUUID } from 'crypto'
5+
6+
import { createRelayedEventMessage, createSubscriptionMessage } from '../utils/messages'
7+
import { isEventIdValid, isEventMatchingFilter, isEventSignatureValid } from '../utils/event'
8+
import { Mirror, Settings } from '../@types/settings'
9+
import { createLogger } from '../factories/logger-factory'
10+
import { IRunnable } from '../@types/base'
11+
import { OutgoingEventMessage } from '../@types/messages'
12+
import { WebSocketServerAdapterEvent } from '../constants/adapter'
13+
14+
const debug = createLogger('static-mirror-worker')
15+
16+
export class StaticMirroringWorker implements IRunnable {
17+
private client: WebSocket | undefined
18+
private config: Mirror
19+
20+
public constructor(
21+
private readonly process: NodeJS.Process,
22+
private readonly settings: () => Settings,
23+
) {
24+
this.process
25+
.on('message', this.onMessage.bind(this))
26+
.on('SIGINT', this.onExit.bind(this))
27+
.on('SIGHUP', this.onExit.bind(this))
28+
.on('SIGTERM', this.onExit.bind(this))
29+
.on('uncaughtException', this.onError.bind(this))
30+
.on('unhandledRejection', this.onError.bind(this))
31+
}
32+
33+
public run(): void {
34+
const currentSettings = this.settings()
35+
36+
console.log('mirroring', currentSettings.mirroring)
37+
38+
this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror
39+
40+
let since = Math.floor(Date.now() / 1000) - 60*10
41+
42+
const createMirror = (config: Mirror) => {
43+
const subscriptionId = `mirror-${randomUUID()}`
44+
45+
debug('connecting to %s', config.address)
46+
47+
return new WebSocket(config.address, { timeout: 5000 })
48+
.on('open', function () {
49+
debug('connected to %s', config.address)
50+
51+
if (Array.isArray(config.filters) && config.filters?.length) {
52+
const filters = config.filters.map((filter) => ({ ...filter, since }))
53+
54+
debug('subscribing with %s: %o', subscriptionId, filters)
55+
56+
this.send(JSON.stringify(createSubscriptionMessage(subscriptionId, filters)))
57+
}
58+
})
59+
.on('message', async function (raw: RawData) {
60+
try {
61+
const message = JSON.parse(raw.toString('utf8')) as OutgoingEventMessage
62+
debug('received: %o', message)
63+
64+
if (!Array.isArray(message)) {
65+
return
66+
}
67+
68+
if (message[0] !== 'EVENT' || message[1] !== subscriptionId) {
69+
return
70+
}
71+
72+
const event = message[2]
73+
74+
if (!anyPass(map(isEventMatchingFilter, config.filters))(event)) {
75+
return
76+
}
77+
78+
if (!await isEventIdValid(event) || !await isEventSignatureValid(event)) {
79+
return
80+
}
81+
82+
since = Math.floor(Date.now()) - 30
83+
84+
if (cluster.isWorker && typeof process.send === 'function') {
85+
process.send({
86+
eventName: WebSocketServerAdapterEvent.Broadcast,
87+
event,
88+
source: config.address,
89+
})
90+
}
91+
} catch (error) {
92+
debug('unable to process message: %o', error)
93+
}
94+
})
95+
.on('close', (code, reason) => {
96+
debug(`disconnected (${code}): ${reason.toString()}`)
97+
98+
setTimeout(() => {
99+
this.client.removeAllListeners()
100+
this.client = createMirror(config)
101+
}, 5000)
102+
})
103+
.on('error', function (error) {
104+
debug('connection error: %o', error)
105+
})
106+
}
107+
108+
this.client = createMirror(this.config)
109+
}
110+
111+
private onMessage(message: { eventName: string, event: unknown, source: string }): void {
112+
if (
113+
message.eventName !== WebSocketServerAdapterEvent.Broadcast
114+
|| message.source === this.config.address
115+
|| !this.client
116+
|| this.client.readyState !== WebSocket.OPEN
117+
) {
118+
return
119+
}
120+
121+
debug('received broadcast: %o', message.event)
122+
123+
const eventToRelay = createRelayedEventMessage(message.event as any, this.config.secret)
124+
debug('relaying: %o', eventToRelay)
125+
this.client.send(JSON.stringify(eventToRelay))
126+
}
127+
128+
private onError(error: Error) {
129+
debug('error: %o', error)
130+
throw error
131+
}
132+
133+
private onExit() {
134+
debug('exiting')
135+
this.close(() => {
136+
this.process.exit(0)
137+
})
138+
}
139+
140+
public close(callback?: () => void) {
141+
debug('closing')
142+
if (this.client) {
143+
this.client.terminate()
144+
}
145+
if (typeof callback === 'function') {
146+
callback()
147+
}
148+
}
149+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { createSettings } from './settings-factory'
2+
import { StaticMirroringWorker } from '../app/static-mirroring-worker'
3+
4+
export const staticMirroringWorkerFactory = () => {
5+
return new StaticMirroringWorker(process, createSettings)
6+
}

src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dotenv.config()
44

55
import { appFactory } from './factories/app-factory'
66
import { maintenanceWorkerFactory } from './factories/maintenance-worker-factory'
7+
import { staticMirroringWorkerFactory } from './factories/static-mirroring.worker-factory'
78
import { workerFactory } from './factories/worker-factory'
89

910
export const getRunner = () => {
@@ -15,6 +16,8 @@ export const getRunner = () => {
1516
return workerFactory()
1617
case 'maintenance':
1718
return maintenanceWorkerFactory()
19+
case 'static-mirroring':
20+
return staticMirroringWorkerFactory()
1821
default:
1922
throw new Error(`Unknown worker: ${process.env.WORKER_TYPE}`)
2023
}

0 commit comments

Comments
 (0)