Skip to content

Commit ff9b87f

Browse files
committed
feat: rate limit msgs/events, send command results
1 parent a46fcc6 commit ff9b87f

12 files changed

+184
-68
lines changed

src/@types/base.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export type Range<F extends number, T extends number> = Exclude<
2323
Enumerate<F>
2424
>
2525

26-
export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput
26+
export type Factory<TOutput = any, TInput = void> = (input: TInput) => TOutput
2727

2828
export type DatabaseClient = Knex
2929

src/@types/messages.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1+
import { EventId, Range } from './base'
12
import { SubscriptionFilter, SubscriptionId } from './subscription'
23
import { Event } from './event'
3-
import { Range } from './base'
44

55
export enum MessageType {
66
REQ = 'REQ',
77
EVENT = 'EVENT',
88
CLOSE = 'CLOSE',
99
NOTICE = 'NOTICE',
1010
EOSE = 'EOSE',
11+
OK = 'OK'
1112
}
1213

1314
export type IncomingMessage =
@@ -20,6 +21,7 @@ export type OutgoingMessage =
2021
| OutgoingEventMessage
2122
| EndOfStoredEventsNotice
2223
| NoticeMessage
24+
| CommandResult
2325

2426
export type SubscribeMessage = {
2527
[index in Range<2, 100>]: SubscriptionFilter
@@ -51,6 +53,13 @@ export interface NoticeMessage {
5153
1: string
5254
}
5355

56+
export interface CommandResult {
57+
0: MessageType.OK
58+
1: EventId
59+
2: boolean
60+
3: string
61+
}
62+
5463
export interface EndOfStoredEventsNotice {
5564
0: MessageType.EOSE
5665
1: SubscriptionId

src/adapters/web-socket-adapter.ts

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { attemptValidation } from '../utils/validation'
1313
import { createLogger } from '../factories/logger-factory'
1414
import { Event } from '../@types/event'
1515
import { Factory } from '../@types/base'
16+
import { IRateLimiter } from '../@types/utils'
17+
import { ISettings } from '../@types/settings'
1618
import { isEventMatchingFilter } from '../utils/event'
1719
import { messageSchema } from '../schemas/message-schema'
1820

@@ -21,7 +23,7 @@ const debugHeartbeat = debug.extend('heartbeat')
2123

2224
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
2325
public clientId: string
24-
// private clientAddress: string
26+
private clientAddress: string
2527
private alive: boolean
2628
private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
2729

@@ -30,13 +32,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
3032
private readonly request: IncomingHttpMessage,
3133
private readonly webSocketServer: IWebSocketServerAdapter,
3234
private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
35+
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
36+
private readonly settingsFactory: Factory<ISettings>,
3337
) {
3438
super()
3539
this.alive = true
3640
this.subscriptions = new Map()
3741

3842
this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
39-
// this.clientAddress = this.request.headers['x-forwarded-for'] as string
43+
this.clientAddress = (this.request.headers['x-forwarded-for'] ?? this.request.socket.remoteAddress) as string
44+
45+
debug('client %s from address %s', this.clientId, this.clientAddress)
4046

4147
this.client
4248
.on('message', this.onClientMessage.bind(this))
@@ -120,10 +126,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
120126
private async onClientMessage(raw: Buffer) {
121127
let abort: () => void
122128
try {
129+
if (await this.isRateLimited(this.clientAddress)) {
130+
this.sendMessage(createNoticeMessage('rate limited'))
131+
return
132+
}
133+
123134
const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))
124135

125136
const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
126-
if (typeof messageHandler.abort === 'function') {
137+
if (typeof messageHandler?.abort === 'function') {
127138
abort = messageHandler.abort.bind(messageHandler)
128139
this.client.prependOnceListener('close', abort)
129140
}
@@ -145,6 +156,36 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
145156
}
146157
}
147158

159+
private async isRateLimited(client: string): Promise<boolean> {
160+
const {
161+
rateLimits,
162+
ipWhitelist = [],
163+
} = this.settingsFactory().limits?.message ?? {}
164+
165+
if (ipWhitelist.includes(client)) {
166+
debug('rate limit check %s: skipped', client)
167+
return false
168+
}
169+
170+
const rateLimiter = this.slidingWindowRateLimiter()
171+
172+
const hit = (period: number, rate: number) =>
173+
rateLimiter.hit(
174+
`${client}:message:${period}`,
175+
1,
176+
{ period: period, rate: rate },
177+
)
178+
179+
const hits = await Promise.all(
180+
rateLimits
181+
.map(({ period, rate }) => hit(period, rate))
182+
)
183+
184+
debug('rate limit check %s: %o = %o', client, rateLimits.map(({ period }) => period), hits)
185+
186+
return hits.some((thresholdCrossed) => thresholdCrossed)
187+
}
188+
148189
private onClientPong() {
149190
debugHeartbeat('client %s pong', this.clientId)
150191
this.alive = true

src/factories/websocket-adapter-factory.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { IncomingMessage } from 'http'
22
import { WebSocket } from 'ws'
33

4+
import { createSettings } from './settings-factory'
45
import { IEventRepository } from '../@types/repositories'
56
import { IWebSocketServerAdapter } from '../@types/adapters'
67
import { messageHandlerFactory } from './message-handler-factory'
8+
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
79
import { WebSocketAdapter } from '../adapters/web-socket-adapter'
810

911

@@ -14,5 +16,7 @@ export const webSocketAdapterFactory = (
1416
client,
1517
request,
1618
webSocketServerAdapter,
17-
messageHandlerFactory(eventRepository)
19+
messageHandlerFactory(eventRepository),
20+
slidingWindowRateLimiterFactory,
21+
createSettings,
1822
)
Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import { mergeDeepLeft } from 'ramda'
2-
3-
import { DelegatedEvent, Event } from '../@types/event'
41
import { EventDelegatorMetadataKey, EventTags } from '../constants/base'
2+
import { createCommandResult } from '../utils/messages'
53
import { createLogger } from '../factories/logger-factory'
6-
import { createNoticeMessage } from '../utils/messages'
4+
import { DelegatedEvent } from '../@types/event'
75
import { EventMessageHandler } from './event-message-handler'
86
import { IMessageHandler } from '../@types/message-handlers'
97
import { IncomingEventMessage } from '../@types/messages'
@@ -14,50 +12,58 @@ const debug = createLogger('delegated-event-message-handler')
1412

1513
export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler {
1614
public async handleMessage(message: IncomingEventMessage): Promise<void> {
15+
debug('received message: %o', message)
1716
const [, event] = message
1817

19-
let reason = this.canAcceptEvent(event)
18+
let reason = await this.isEventValid(event)
2019
if (reason) {
2120
debug('event %s rejected: %s', event.id, reason)
22-
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
21+
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
22+
return
23+
}
24+
25+
if (await this.isRateLimited(event)) {
26+
debug('event %s rejected: rate-limited')
27+
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
2328
return
2429
}
2530

26-
reason = await this.isEventValid(event)
31+
reason = this.canAcceptEvent(event)
2732
if (reason) {
2833
debug('event %s rejected: %s', event.id, reason)
29-
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
34+
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
3035
return
3136
}
3237

3338
const [, delegator] = event.tags.find((tag) => tag.length === 4 && tag[0] === EventTags.Delegation)
34-
const delegatedEvent: DelegatedEvent = mergeDeepLeft(
35-
event,
36-
{
39+
const delegatedEvent: DelegatedEvent = {
40+
...event,
3741
[EventDelegatorMetadataKey]: delegator,
38-
}
39-
)
42+
}
4043

4144
const strategy = this.strategyFactory([delegatedEvent, this.webSocket])
4245

4346
if (typeof strategy?.execute !== 'function') {
47+
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
4448
return
4549
}
4650

4751
try {
4852
await strategy.execute(delegatedEvent)
4953
} catch (error) {
5054
debug('error handling message %o: %o', message, error)
55+
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
5156
}
5257
}
5358

54-
protected async isEventValid(event: Event): Promise<string | undefined> {
59+
protected async isEventValid(event: DelegatedEvent): Promise<string | undefined> {
5560
const reason = await super.isEventValid(event)
5661
if (reason) {
5762
return reason
5863
}
64+
5965
if (!await isDelegatedEventValid(event)) {
60-
return `Event with id ${event.id} from ${event.pubkey} is invalid delegated event`
66+
return 'invalid: delegation verification failed'
6167
}
6268
}
6369
}

0 commit comments

Comments
 (0)