11import { serviceCapabilities } from '@libp2p/interface'
22import { RecordEnvelope , PeerRecord } from '@libp2p/peer-record'
3+ import { debounce } from '@libp2p/utils/debounce'
34import { protocols } from '@multiformats/multiaddr'
45import drain from 'it-drain'
56import parallel from 'it-parallel'
@@ -9,7 +10,8 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
910import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
1011import {
1112 MULTICODEC_IDENTIFY_PUSH_PROTOCOL_NAME ,
12- MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION
13+ MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION ,
14+ PUSH_DEBOUNCE_MS
1315} from './consts.js'
1416import { Identify as IdentifyMessage } from './pb/message.js'
1517import { AbstractIdentify , consumeIdentifyMessage , defaultValues } from './utils.js'
@@ -20,6 +22,7 @@ import type { ConnectionManager } from '@libp2p/interface-internal'
2022export class IdentifyPush extends AbstractIdentify implements Startable , IdentifyPushInterface {
2123 private readonly connectionManager : ConnectionManager
2224 private readonly concurrency : number
25+ private _push : ( ) => void
2326
2427 constructor ( components : IdentifyPushComponents , init : IdentifyPushInit = { } ) {
2528 super ( components , {
@@ -31,10 +34,14 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
3134 this . connectionManager = components . connectionManager
3235 this . concurrency = init . concurrency ?? defaultValues . concurrency
3336
37+ this . _push = debounce ( this . sendPushMessage . bind ( this ) , init . debounce ?? PUSH_DEBOUNCE_MS )
38+
3439 if ( ( init . runOnSelfUpdate ?? defaultValues . runOnSelfUpdate ) ) {
3540 // When self peer record changes, trigger identify-push
3641 components . events . addEventListener ( 'self:peer:update' , ( evt ) => {
37- void this . push ( ) . catch ( err => { this . log . error ( err ) } )
42+ this . push ( ) . catch ( err => {
43+ this . log . error ( 'error pushing updates to peers - %e' , err )
44+ } )
3845 } )
3946 }
4047 }
@@ -47,72 +54,80 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
4754 * Calls `push` on all peer connections
4855 */
4956 async push ( ) : Promise < void > {
57+ this . _push ( )
58+ }
59+
60+ private async sendPushMessage ( ) : Promise < void > {
5061 // Do not try to push if we are not running
5162 if ( ! this . isStarted ( ) ) {
5263 return
5364 }
5465
55- const listenAddresses = this . addressManager . getAddresses ( ) . map ( ma => ma . decapsulateCode ( protocols ( 'p2p' ) . code ) )
56- const peerRecord = new PeerRecord ( {
57- peerId : this . peerId ,
58- multiaddrs : listenAddresses
59- } )
60- const signedPeerRecord = await RecordEnvelope . seal ( peerRecord , this . privateKey )
61- const supportedProtocols = this . registrar . getProtocols ( )
62- const peer = await this . peerStore . get ( this . peerId )
63- const agentVersion = uint8ArrayToString ( peer . metadata . get ( 'AgentVersion' ) ?? uint8ArrayFromString ( this . host . agentVersion ) )
64- const protocolVersion = uint8ArrayToString ( peer . metadata . get ( 'ProtocolVersion' ) ?? uint8ArrayFromString ( this . host . protocolVersion ) )
65- const self = this
66-
67- async function * pushToConnections ( ) : AsyncGenerator < ( ) => Promise < void > > {
68- for ( const connection of self . connectionManager . getConnections ( ) ) {
69- const peer = await self . peerStore . get ( connection . remotePeer )
70-
71- if ( ! peer . protocols . includes ( self . protocol ) ) {
72- continue
73- }
66+ try {
67+ const listenAddresses = this . addressManager . getAddresses ( ) . map ( ma => ma . decapsulateCode ( protocols ( 'p2p' ) . code ) )
68+ const peerRecord = new PeerRecord ( {
69+ peerId : this . peerId ,
70+ multiaddrs : listenAddresses
71+ } )
72+ const signedPeerRecord = await RecordEnvelope . seal ( peerRecord , this . privateKey )
73+ const supportedProtocols = this . registrar . getProtocols ( )
74+ const peer = await this . peerStore . get ( this . peerId )
75+ const agentVersion = uint8ArrayToString ( peer . metadata . get ( 'AgentVersion' ) ?? uint8ArrayFromString ( this . host . agentVersion ) )
76+ const protocolVersion = uint8ArrayToString ( peer . metadata . get ( 'ProtocolVersion' ) ?? uint8ArrayFromString ( this . host . protocolVersion ) )
77+ const self = this
78+
79+ async function * pushToConnections ( ) : AsyncGenerator < ( ) => Promise < void > > {
80+ for ( const connection of self . connectionManager . getConnections ( ) ) {
81+ const peer = await self . peerStore . get ( connection . remotePeer )
82+
83+ if ( ! peer . protocols . includes ( self . protocol ) ) {
84+ continue
85+ }
7486
75- yield async ( ) => {
76- let stream : Stream | undefined
77- const signal = AbortSignal . timeout ( self . timeout )
78-
79- setMaxListeners ( Infinity , signal )
80-
81- try {
82- stream = await connection . newStream ( self . protocol , {
83- signal,
84- runOnLimitedConnection : self . runOnLimitedConnection
85- } )
86-
87- const pb = pbStream ( stream , {
88- maxDataLength : self . maxMessageSize
89- } ) . pb ( IdentifyMessage )
90-
91- await pb . write ( {
92- listenAddrs : listenAddresses . map ( ma => ma . bytes ) ,
93- signedPeerRecord : signedPeerRecord . marshal ( ) ,
94- protocols : supportedProtocols ,
95- agentVersion,
96- protocolVersion
97- } , {
98- signal
99- } )
100-
101- await stream . close ( {
102- signal
103- } )
104- } catch ( err : any ) {
105- // Just log errors
106- self . log . error ( 'could not push identify update to peer' , err )
107- stream ?. abort ( err )
87+ yield async ( ) => {
88+ let stream : Stream | undefined
89+ const signal = AbortSignal . timeout ( self . timeout )
90+
91+ setMaxListeners ( Infinity , signal )
92+
93+ try {
94+ stream = await connection . newStream ( self . protocol , {
95+ signal,
96+ runOnLimitedConnection : self . runOnLimitedConnection
97+ } )
98+
99+ const pb = pbStream ( stream , {
100+ maxDataLength : self . maxMessageSize
101+ } ) . pb ( IdentifyMessage )
102+
103+ await pb . write ( {
104+ listenAddrs : listenAddresses . map ( ma => ma . bytes ) ,
105+ signedPeerRecord : signedPeerRecord . marshal ( ) ,
106+ protocols : supportedProtocols ,
107+ agentVersion,
108+ protocolVersion
109+ } , {
110+ signal
111+ } )
112+
113+ await stream . close ( {
114+ signal
115+ } )
116+ } catch ( err : any ) {
117+ // Just log errors
118+ self . log . error ( 'could not push identify update to peer' , err )
119+ stream ?. abort ( err )
120+ }
108121 }
109122 }
110123 }
111- }
112124
113- await drain ( parallel ( pushToConnections ( ) , {
114- concurrency : this . concurrency
115- } ) )
125+ await drain ( parallel ( pushToConnections ( ) , {
126+ concurrency : this . concurrency
127+ } ) )
128+ } catch ( err : any ) {
129+ this . log . error ( 'error pushing updates to peers - %e' , err )
130+ }
116131 }
117132
118133 /**
0 commit comments