diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index cb6e0a7bbe..32e0d59c37 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -146,6 +146,11 @@ export abstract class PubSubBaseProtocol = Pu this.log('starting') + // Reset rate limiting counters periodically + this._rateLimitInterval = setInterval(() => { + this._peerMessageCounts.clear() + }, 60000) // Reset every minute + const registrar = this.components.registrar // Incoming streams // Called after a peer dials us @@ -169,13 +174,18 @@ export abstract class PubSubBaseProtocol = Pu } /** - * Unregister the pubsub protocol and the streams with other peers will be closed. + * Unregister the pubsub protocol and stop it */ async stop (): Promise { - if (!this.started || !this.enabled) { + if (!this.started) { return } + // Clear rate limiting interval + if (this._rateLimitInterval != null) { + clearInterval(this._rateLimitInterval) + } + const registrar = this.components.registrar // unregister protocol and handlers @@ -451,29 +461,58 @@ export abstract class PubSubBaseProtocol = Pu * Handles a message from a peer */ async processMessage (from: PeerId, msg: Message): Promise { - if (this.components.peerId.equals(from) && !this.emitSelf) { - return - } + // Add metrics for message processing + this.components.metrics?.increment('libp2p_pubsub_message_processed_total') + const processingStart = performance.now() - // Ensure the message is valid before processing it try { - await this.validate(from, msg) - } catch (err: any) { - this.log('Message is invalid, dropping it. %O', err) - return - } + if (this.components.peerId.equals(from) && !this.emitSelf) { + return + } - if (this.subscriptions.has(msg.topic)) { - const isFromSelf = this.components.peerId.equals(from) + // Rate limiting per peer + const peerMessageCount = this._peerMessageCounts.get(from.toString()) ?? 0 + const MAX_MESSAGES_PER_PEER = 100 // Messages per minute + if (peerMessageCount > MAX_MESSAGES_PER_PEER) { + this.log('Rate limit exceeded for peer %p', from) + this.components.metrics?.increment('libp2p_pubsub_rate_limit_exceeded') + return + } + this._peerMessageCounts.set(from.toString(), peerMessageCount + 1) - if (!isFromSelf || this.emitSelf) { - super.dispatchEvent(new CustomEvent('message', { - detail: msg - })) + // Ensure the message is valid before processing it + try { + await this.validate(from, msg) + } catch (err: any) { + this.log('Message is invalid, dropping it. %O', err) + this.components.metrics?.increment('libp2p_pubsub_message_dropped_invalid') + return } - } - await this.publishMessage(from, msg) + if (this.subscriptions.has(msg.topic)) { + const isFromSelf = this.components.peerId.equals(from) + + if (!isFromSelf || this.emitSelf) { + super.dispatchEvent(new CustomEvent('message', { + detail: msg + })) + this.components.metrics?.increment('libp2p_pubsub_message_delivered') + } + } + + await this.publishMessage(from, msg) + + // Track successful processing + const processingTime = performance.now() - processingStart + this.components.metrics?.histogram('libp2p_pubsub_message_processing_time', processingTime) + + } catch (err) { + // Track failed processing + this.components.metrics?.increment('libp2p_pubsub_message_processing_failed') + const processingTime = performance.now() - processingStart + this.components.metrics?.histogram('libp2p_pubsub_message_processing_time', processingTime) + this.log.error('Failed to process message', err) + } } /** @@ -568,56 +607,91 @@ export abstract class PubSubBaseProtocol = Pu * Throws an error on invalid messages */ async validate (from: PeerId, message: Message): Promise { - const signaturePolicy = this.globalSignaturePolicy - switch (signaturePolicy) { - case 'StrictNoSign': - if (message.type !== 'unsigned') { - throw new InvalidMessageError('Message type should be "unsigned" when signature policy is StrictNoSign but it was not') - } + // Add metrics for validation attempts + this.components.metrics?.increment('libp2p_pubsub_message_validation_total') + const validationStart = performance.now() - // @ts-expect-error should not be present - if (message.signature != null) { - throw new InvalidMessageError('StrictNoSigning: signature should not be present') - } + try { + // Basic message validation + if (message.topic == null || message.topic === '') { + throw new InvalidMessageError('Message must have a valid topic') + } - // @ts-expect-error should not be present - if (message.key != null) { - throw new InvalidMessageError('StrictNoSigning: key should not be present') - } + if (message.data == null) { + throw new InvalidMessageError('Message must have data') + } - // @ts-expect-error should not be present - if (message.sequenceNumber != null) { - throw new InvalidMessageError('StrictNoSigning: seqno should not be present') - } - break - case 'StrictSign': - if (message.type !== 'signed') { - throw new InvalidMessageError('Message type should be "signed" when signature policy is StrictSign but it was not') - } + // Check message size + const MAX_MESSAGE_SIZE = 1024 * 1024 // 1MB + if (message.data.length > MAX_MESSAGE_SIZE) { + throw new InvalidMessageError(`Message size exceeds limit of ${MAX_MESSAGE_SIZE} bytes`) + } - if (message.signature == null) { - throw new InvalidMessageError('StrictSigning: Signing required and no signature was present') - } + const signaturePolicy = this.globalSignaturePolicy + switch (signaturePolicy) { + case 'StrictNoSign': + if (message.type !== 'unsigned') { + throw new InvalidMessageError('Message type should be "unsigned" when signature policy is StrictNoSign but it was not') + } - if (message.sequenceNumber == null) { - throw new InvalidMessageError('StrictSigning: Signing required and no sequenceNumber was present') - } + // @ts-expect-error should not be present + if (message.signature != null) { + throw new InvalidMessageError('StrictNoSigning: signature should not be present') + } - if (!(await verifySignature(message, this.encodeMessage.bind(this)))) { - throw new InvalidMessageError('StrictSigning: Invalid message signature') - } + // @ts-expect-error should not be present + if (message.key != null) { + throw new InvalidMessageError('StrictNoSigning: key should not be present') + } - break - default: - throw new InvalidMessageError('Cannot validate message: unhandled signature policy') - } + // @ts-expect-error should not be present + if (message.sequenceNumber != null) { + throw new InvalidMessageError('StrictNoSigning: seqno should not be present') + } + break + case 'StrictSign': + if (message.type !== 'signed') { + throw new InvalidMessageError('Message type should be "signed" when signature policy is StrictSign but it was not') + } + + if (message.signature == null) { + throw new InvalidMessageError('StrictSigning: Signing required and no signature was present') + } + + if (message.sequenceNumber == null) { + throw new InvalidMessageError('StrictSigning: Signing required and no sequenceNumber was present') + } - const validatorFn = this.topicValidators.get(message.topic) - if (validatorFn != null) { - const result = await validatorFn(from, message) - if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) { - throw new InvalidMessageError('Message validation failed') + if (!(await verifySignature(message, this.encodeMessage.bind(this)))) { + this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_signature') + throw new InvalidMessageError('StrictSigning: Invalid message signature') + } + + break + default: + throw new InvalidMessageError('Cannot validate message: unhandled signature policy') + } + + const validatorFn = this.topicValidators.get(message.topic) + if (validatorFn != null) { + const result = await validatorFn(from, message) + if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) { + this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_topic') + throw new InvalidMessageError('Message validation failed') + } } + + // Track successful validation + this.components.metrics?.increment('libp2p_pubsub_message_validation_success') + const validationTime = performance.now() - validationStart + this.components.metrics?.histogram('libp2p_pubsub_message_validation_time', validationTime) + + } catch (err) { + // Track failed validation + this.components.metrics?.increment('libp2p_pubsub_message_validation_failed_total') + const validationTime = performance.now() - validationStart + this.components.metrics?.histogram('libp2p_pubsub_message_validation_time', validationTime) + throw err } } @@ -772,4 +846,8 @@ export abstract class PubSubBaseProtocol = Pu return Array.from(this.peers.keys()) } + + // Add rate limiting state + private _peerMessageCounts: Map = new Map() + private _rateLimitInterval: ReturnType }