@@ -104,118 +104,124 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
104104 log . error ( 'error during query - %e' , evt . detail )
105105 } )
106106
107- signal . addEventListener ( 'abort' , ( ) => {
107+ const onAbort = ( ) : void => {
108108 queue . abort ( )
109109 events . end ( new AbortError ( ) )
110- } )
110+ }
111111
112- // perform lookups on kadId, not the actual value
113- const kadId = await convertBuffer ( key , {
114- signal
115- } )
112+ signal . addEventListener ( 'abort' , onAbort )
116113
117- /**
118- * Adds the passed peer to the query queue if it's not us and no other path
119- * has passed through this peer
120- */
121- function queryPeer ( peer : PeerInfo , peerKadId : Uint8Array ) : void {
122- if ( peer == null ) {
123- return
124- }
125-
126- peersSeen . add ( peer . id . toMultihash ( ) . bytes )
127-
128- const peerXor = uint8ArrayXor ( peerKadId , kadId )
129-
130- queue . add ( async ( ) => {
131- try {
132- for await ( const event of query ( {
133- ...options ,
134- key,
135- peer,
136- path : {
137- index : path ,
138- queued : queue . queued ,
139- running : queue . running ,
140- total : queue . size
141- } ,
142- numPaths,
143- peerKadId,
144- signal
145- } ) ) {
146- // if there are closer peers and the query has not completed, continue the query
147- if ( event . name === 'PEER_RESPONSE' ) {
148- for ( const closerPeer of event . closer ) {
149- if ( peersSeen . has ( closerPeer . id . toMultihash ( ) . bytes ) ) { // eslint-disable-line max-depth
150- log ( 'already seen %p in query' , closerPeer . id )
151- continue
152- }
114+ try {
115+ // perform lookups on kadId, not the actual value
116+ const kadId = await convertBuffer ( key , {
117+ signal
118+ } )
153119
154- if ( ourPeerId . equals ( closerPeer . id ) ) { // eslint-disable-line max-depth
155- log ( 'not querying ourselves' )
156- continue
157- }
120+ /**
121+ * Adds the passed peer to the query queue if it's not us and no other path
122+ * has passed through this peer
123+ */
124+ function queryPeer ( peer : PeerInfo , peerKadId : Uint8Array ) : void {
125+ if ( peer == null ) {
126+ return
127+ }
158128
159- if ( ! ( await connectionManager . isDialable ( closerPeer . multiaddrs ) ) ) { // eslint-disable-line max-depth
160- log ( 'not querying undialable peer' )
161- continue
162- }
129+ peersSeen . add ( peer . id . toMultihash ( ) . bytes )
163130
164- const closerPeerKadId = await convertPeerId ( closerPeer . id , {
165- signal
166- } )
167- const closerPeerXor = uint8ArrayXor ( closerPeerKadId , kadId )
131+ const peerXor = uint8ArrayXor ( peerKadId , kadId )
168132
169- // only continue query if closer peer is actually closer
170- if ( uint8ArrayXorCompare ( closerPeerXor , peerXor ) !== - 1 ) { // eslint-disable-line max-depth
171- log ( 'skipping %p as they are not closer to %b than %p' , closerPeer . id , key , peer )
172- continue
133+ queue . add ( async ( ) => {
134+ try {
135+ for await ( const event of query ( {
136+ ...options ,
137+ key,
138+ peer,
139+ path : {
140+ index : path ,
141+ queued : queue . queued ,
142+ running : queue . running ,
143+ total : queue . size
144+ } ,
145+ numPaths,
146+ peerKadId,
147+ signal
148+ } ) ) {
149+ // if there are closer peers and the query has not completed, continue the query
150+ if ( event . name === 'PEER_RESPONSE' ) {
151+ for ( const closerPeer of event . closer ) {
152+ if ( peersSeen . has ( closerPeer . id . toMultihash ( ) . bytes ) ) { // eslint-disable-line max-depth
153+ log ( 'already seen %p in query' , closerPeer . id )
154+ continue
155+ }
156+
157+ if ( ourPeerId . equals ( closerPeer . id ) ) { // eslint-disable-line max-depth
158+ log ( 'not querying ourselves' )
159+ continue
160+ }
161+
162+ if ( ! ( await connectionManager . isDialable ( closerPeer . multiaddrs ) ) ) { // eslint-disable-line max-depth
163+ log ( 'not querying undialable peer' )
164+ continue
165+ }
166+
167+ const closerPeerKadId = await convertPeerId ( closerPeer . id , {
168+ signal
169+ } )
170+ const closerPeerXor = uint8ArrayXor ( closerPeerKadId , kadId )
171+
172+ // only continue query if closer peer is actually closer
173+ if ( uint8ArrayXorCompare ( closerPeerXor , peerXor ) !== - 1 ) { // eslint-disable-line max-depth
174+ log ( 'skipping %p as they are not closer to %b than %p' , closerPeer . id , key , peer )
175+ continue
176+ }
177+
178+ log ( 'querying closer peer %p' , closerPeer . id )
179+ queryPeer ( closerPeer , closerPeerKadId )
173180 }
174-
175- log ( 'querying closer peer %p' , closerPeer . id )
176- queryPeer ( closerPeer , closerPeerKadId )
177181 }
178- }
179182
180- events . push ( {
181- ...event ,
183+ events . push ( {
184+ ...event ,
185+ path : {
186+ index : path ,
187+ queued : queue . queued ,
188+ running : queue . running ,
189+ total : queue . size
190+ }
191+ } )
192+ }
193+ } catch ( err : any ) {
194+ // yield error event if query is continuing
195+ events . push ( queryErrorEvent ( {
196+ from : peer . id ,
197+ error : err ,
182198 path : {
183199 index : path ,
184200 queued : queue . queued ,
185- running : queue . running ,
186- total : queue . size
201+ running : queue . running - 1 ,
202+ total : queue . size - 1
187203 }
188- } )
204+ } , options ) )
189205 }
190- } catch ( err : any ) {
191- // yield error event if query is continuing
192- events . push ( queryErrorEvent ( {
193- from : peer . id ,
194- error : err ,
195- path : {
196- index : path ,
197- queued : queue . queued ,
198- running : queue . running - 1 ,
199- total : queue . size - 1
200- }
201- } , options ) )
202- }
203- } , {
204- distance : peerXor
205- } ) . catch ( err => {
206- log . error ( 'error during query - %e' , err )
207- } )
208- }
209-
210- // begin the query with the starting peers
211- await Promise . all (
212- startingPeers . map ( async startingPeer => {
213- queryPeer ( { id : startingPeer , multiaddrs : [ ] } , await convertPeerId ( startingPeer , {
214- signal
215- } ) )
216- } )
217- )
206+ } , {
207+ distance : peerXor
208+ } ) . catch ( err => {
209+ log . error ( 'error during query - %e' , err )
210+ } )
211+ }
218212
219- // yield results as they come in
220- yield * events
213+ // begin the query with the starting peers
214+ await Promise . all (
215+ startingPeers . map ( async startingPeer => {
216+ queryPeer ( { id : startingPeer , multiaddrs : [ ] } , await convertPeerId ( startingPeer , {
217+ signal
218+ } ) )
219+ } )
220+ )
221+
222+ // yield results as they come in
223+ yield * events
224+ } finally {
225+ signal . removeEventListener ( 'abort' , onAbort )
226+ }
221227}
0 commit comments