Skip to content

fix(pubsub): enhance message validation and routing robustness #3223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 138 additions & 60 deletions packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu

this.log('starting')

// Reset rate limiting counters periodically
this._rateLimitInterval = setInterval(() => {
this._peerMessageCounts.clear()
}, 60000) // Reset every minute

const registrar = this.components.registrar
// Incoming streams
// Called after a peer dials us
Expand All @@ -169,13 +174,18 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
}

/**
* Unregister the pubsub protocol and the streams with other peers will be closed.
* Unregister the pubsub protocol and stop it
*/
async stop (): Promise<void> {
if (!this.started || !this.enabled) {
if (!this.started) {
return
}

// Clear rate limiting interval
if (this._rateLimitInterval != null) {
clearInterval(this._rateLimitInterval)
}

const registrar = this.components.registrar

// unregister protocol and handlers
Expand Down Expand Up @@ -451,29 +461,58 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
* Handles a message from a peer
*/
async processMessage (from: PeerId, msg: Message): Promise<void> {
if (this.components.peerId.equals(from) && !this.emitSelf) {
return
}
// Add metrics for message processing
this.components.metrics?.increment('libp2p_pubsub_message_processed_total')
const processingStart = performance.now()

// Ensure the message is valid before processing it
try {
await this.validate(from, msg)
} catch (err: any) {
this.log('Message is invalid, dropping it. %O', err)
return
}
if (this.components.peerId.equals(from) && !this.emitSelf) {
return
}

if (this.subscriptions.has(msg.topic)) {
const isFromSelf = this.components.peerId.equals(from)
// Rate limiting per peer
const peerMessageCount = this._peerMessageCounts.get(from.toString()) ?? 0
const MAX_MESSAGES_PER_PEER = 100 // Messages per minute
if (peerMessageCount > MAX_MESSAGES_PER_PEER) {
this.log('Rate limit exceeded for peer %p', from)
this.components.metrics?.increment('libp2p_pubsub_rate_limit_exceeded')
return
}
this._peerMessageCounts.set(from.toString(), peerMessageCount + 1)

if (!isFromSelf || this.emitSelf) {
super.dispatchEvent(new CustomEvent<Message>('message', {
detail: msg
}))
// Ensure the message is valid before processing it
try {
await this.validate(from, msg)
} catch (err: any) {
this.log('Message is invalid, dropping it. %O', err)
this.components.metrics?.increment('libp2p_pubsub_message_dropped_invalid')
return
}
}

await this.publishMessage(from, msg)
if (this.subscriptions.has(msg.topic)) {
const isFromSelf = this.components.peerId.equals(from)

if (!isFromSelf || this.emitSelf) {
super.dispatchEvent(new CustomEvent<Message>('message', {
detail: msg
}))
this.components.metrics?.increment('libp2p_pubsub_message_delivered')
}
}

await this.publishMessage(from, msg)

// Track successful processing
const processingTime = performance.now() - processingStart
this.components.metrics?.histogram('libp2p_pubsub_message_processing_time', processingTime)

} catch (err) {
// Track failed processing
this.components.metrics?.increment('libp2p_pubsub_message_processing_failed')
const processingTime = performance.now() - processingStart
this.components.metrics?.histogram('libp2p_pubsub_message_processing_time', processingTime)
this.log.error('Failed to process message', err)
}
}

/**
Expand Down Expand Up @@ -568,56 +607,91 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
* Throws an error on invalid messages
*/
async validate (from: PeerId, message: Message): Promise<void> {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
if (message.type !== 'unsigned') {
throw new InvalidMessageError('Message type should be "unsigned" when signature policy is StrictNoSign but it was not')
}
// Add metrics for validation attempts
this.components.metrics?.increment('libp2p_pubsub_message_validation_total')
const validationStart = performance.now()

// @ts-expect-error should not be present
if (message.signature != null) {
throw new InvalidMessageError('StrictNoSigning: signature should not be present')
}
try {
// Basic message validation
if (message.topic == null || message.topic === '') {
throw new InvalidMessageError('Message must have a valid topic')
}

// @ts-expect-error should not be present
if (message.key != null) {
throw new InvalidMessageError('StrictNoSigning: key should not be present')
}
if (message.data == null) {
throw new InvalidMessageError('Message must have data')
}

// @ts-expect-error should not be present
if (message.sequenceNumber != null) {
throw new InvalidMessageError('StrictNoSigning: seqno should not be present')
}
break
case 'StrictSign':
if (message.type !== 'signed') {
throw new InvalidMessageError('Message type should be "signed" when signature policy is StrictSign but it was not')
}
// Check message size
const MAX_MESSAGE_SIZE = 1024 * 1024 // 1MB
if (message.data.length > MAX_MESSAGE_SIZE) {
throw new InvalidMessageError(`Message size exceeds limit of ${MAX_MESSAGE_SIZE} bytes`)
}

if (message.signature == null) {
throw new InvalidMessageError('StrictSigning: Signing required and no signature was present')
}
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
if (message.type !== 'unsigned') {
throw new InvalidMessageError('Message type should be "unsigned" when signature policy is StrictNoSign but it was not')
}

if (message.sequenceNumber == null) {
throw new InvalidMessageError('StrictSigning: Signing required and no sequenceNumber was present')
}
// @ts-expect-error should not be present
if (message.signature != null) {
throw new InvalidMessageError('StrictNoSigning: signature should not be present')
}

if (!(await verifySignature(message, this.encodeMessage.bind(this)))) {
throw new InvalidMessageError('StrictSigning: Invalid message signature')
}
// @ts-expect-error should not be present
if (message.key != null) {
throw new InvalidMessageError('StrictNoSigning: key should not be present')
}

break
default:
throw new InvalidMessageError('Cannot validate message: unhandled signature policy')
}
// @ts-expect-error should not be present
if (message.sequenceNumber != null) {
throw new InvalidMessageError('StrictNoSigning: seqno should not be present')
}
break
case 'StrictSign':
if (message.type !== 'signed') {
throw new InvalidMessageError('Message type should be "signed" when signature policy is StrictSign but it was not')
}

if (message.signature == null) {
throw new InvalidMessageError('StrictSigning: Signing required and no signature was present')
}

if (message.sequenceNumber == null) {
throw new InvalidMessageError('StrictSigning: Signing required and no sequenceNumber was present')
}

const validatorFn = this.topicValidators.get(message.topic)
if (validatorFn != null) {
const result = await validatorFn(from, message)
if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) {
throw new InvalidMessageError('Message validation failed')
if (!(await verifySignature(message, this.encodeMessage.bind(this)))) {
this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_signature')
throw new InvalidMessageError('StrictSigning: Invalid message signature')
}

break
default:
throw new InvalidMessageError('Cannot validate message: unhandled signature policy')
}

const validatorFn = this.topicValidators.get(message.topic)
if (validatorFn != null) {
const result = await validatorFn(from, message)
if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) {
this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_topic')
throw new InvalidMessageError('Message validation failed')
}
}

// Track successful validation
this.components.metrics?.increment('libp2p_pubsub_message_validation_success')
const validationTime = performance.now() - validationStart
this.components.metrics?.histogram('libp2p_pubsub_message_validation_time', validationTime)

} catch (err) {
// Track failed validation
this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_total')
const validationTime = performance.now() - validationStart
this.components.metrics?.histogram('libp2p_pubsub_message_validation_time', validationTime)
throw err
}
}

Expand Down Expand Up @@ -772,4 +846,8 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu

return Array.from(this.peers.keys())
}

// Add rate limiting state
private _peerMessageCounts: Map<string, number> = new Map()
private _rateLimitInterval: ReturnType<typeof setInterval>
}