33const errcode = require ( 'err-code' )
44const pTimeout = require ( 'p-timeout' )
55const uint8ArrayEquals = require ( 'uint8arrays/equals' )
6+ const uint8ArrayToString = require ( 'uint8arrays/to-string' )
67const libp2pRecord = require ( 'libp2p-record' )
7-
88const c = require ( '../constants' )
99const Query = require ( '../query' )
10-
1110const utils = require ( '../utils' )
12-
1311const Record = libp2pRecord . Record
1412
13+ /**
14+ * @typedef {import('peer-id') } PeerId
15+ * @typedef {import('../query').DHTQueryResult } DHTQueryResult
16+ */
17+
18+ /**
19+ * @param {import('../') } dht
20+ */
1521module . exports = ( dht ) => {
22+ /**
23+ * @param {Uint8Array } key
24+ * @param {Uint8Array } rec
25+ */
1626 const putLocal = async ( key , rec ) => { // eslint-disable-line require-await
1727 return dht . datastore . put ( utils . bufferToKey ( key ) , rec )
1828 }
@@ -22,30 +32,26 @@ module.exports = (dht) => {
2232 * the local datastore.
2333 *
2434 * @param {Uint8Array } key
25- * @returns {Promise<Record> }
26- *
27- * @private
2835 */
2936 const getLocal = async ( key ) => {
30- dht . _log ( ' getLocal %b' , key )
37+ dht . _log ( ` getLocal ${ uint8ArrayToString ( key , 'base32' ) } ` )
3138
3239 const raw = await dht . datastore . get ( utils . bufferToKey ( key ) )
33- dht . _log ( 'found %b in local datastore' , key )
40+ dht . _log ( `found ${ uint8ArrayToString ( key , 'base32' ) } in local datastore` )
41+
3442 const rec = Record . deserialize ( raw )
3543
3644 await dht . _verifyRecordLocally ( rec )
45+
3746 return rec
3847 }
3948
4049 /**
4150 * Send the best record found to any peers that have an out of date record.
4251 *
4352 * @param {Uint8Array } key
44- * @param {Array<Object> } vals - values retrieved from the DHT
45- * @param {Object } best - the best record that was found
46- * @returns {Promise }
47- *
48- * @private
53+ * @param {import('../query').DHTQueryValue[] } vals - values retrieved from the DHT
54+ * @param {Uint8Array } best - the best record that was found
4955 */
5056 const sendCorrectionRecord = async ( key , vals , best ) => {
5157 const fixupRec = await utils . createPutRecord ( key , best )
@@ -78,10 +84,9 @@ module.exports = (dht) => {
7884 return {
7985 /**
8086 * Store the given key/value pair locally, in the datastore.
87+ *
8188 * @param {Uint8Array } key
8289 * @param {Uint8Array } rec - encoded record
83- * @returns {Promise<void> }
84- * @private
8590 */
8691 async _putLocal ( key , rec ) { // eslint-disable-line require-await
8792 return putLocal ( key , rec )
@@ -92,9 +97,8 @@ module.exports = (dht) => {
9297 *
9398 * @param {Uint8Array } key
9499 * @param {Uint8Array } value
95- * @param {Object } [options] - put options
100+ * @param {object } [options] - put options
96101 * @param {number } [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
97- * @returns {Promise<void> }
98102 */
99103 async put ( key , value , options = { } ) {
100104 dht . _log ( 'PutValue %b' , key )
@@ -134,9 +138,8 @@ module.exports = (dht) => {
134138 * Times out after 1 minute by default.
135139 *
136140 * @param {Uint8Array } key
137- * @param {Object } [options] - get options
141+ * @param {object } [options] - get options
138142 * @param {number } [options.timeout] - optional timeout (default: 60000)
139- * @returns {Promise<Uint8Array> }
140143 */
141144 async get ( key , options = { } ) {
142145 options . timeout = options . timeout || c . minute
@@ -173,16 +176,15 @@ module.exports = (dht) => {
173176 *
174177 * @param {Uint8Array } key
175178 * @param {number } nvals
176- * @param {Object } [options] - get options
179+ * @param {object } [options] - get options
177180 * @param {number } [options.timeout] - optional timeout (default: 60000)
178- * @returns {Promise<Array<{from: PeerId, val: Uint8Array}>> }
179181 */
180182 async getMany ( key , nvals , options = { } ) {
181183 options . timeout = options . timeout || c . minute
182184
183185 dht . _log ( 'getMany %b (%s)' , key , nvals )
184186
185- let vals = [ ]
187+ const vals = [ ]
186188 let localRec
187189
188190 try {
@@ -204,9 +206,8 @@ module.exports = (dht) => {
204206 return vals
205207 }
206208
207- const paths = [ ]
208209 const id = await utils . convertBuffer ( key )
209- const rtp = dht . routingTable . closestPeers ( id , this . kBucketSize )
210+ const rtp = dht . routingTable . closestPeers ( id , dht . kBucketSize )
210211
211212 dht . _log ( 'peers in rt: %d' , rtp . length )
212213
@@ -220,15 +221,23 @@ module.exports = (dht) => {
220221 return vals
221222 }
222223
223- // we have peers, lets do the actual query to them
224- const query = new Query ( dht , key , ( pathIndex , numPaths ) => {
225- // This function body runs once per disjoint path
226- const pathSize = utils . pathSize ( nvals - vals . length , numPaths )
227- const pathVals = [ ]
228- paths . push ( pathVals )
224+ const valsLength = vals . length
229225
230- // Here we return the query function to use on this particular disjoint path
231- return async ( peer ) => {
226+ /**
227+ * @param {number } pathIndex
228+ * @param {number } numPaths
229+ */
230+ function createQuery ( pathIndex , numPaths ) {
231+ // This function body runs once per disjoint path
232+ const pathSize = utils . pathSize ( nvals - valsLength , numPaths )
233+ let queryResults = 0
234+
235+ /**
236+ * Here we return the query function to use on this particular disjoint path
237+ *
238+ * @param {PeerId } peer
239+ */
240+ async function disjointPathQuery ( peer ) {
232241 let rec , peers , lookupErr
233242 try {
234243 const results = await dht . _getValueOrPeers ( peer , key )
@@ -242,37 +251,49 @@ module.exports = (dht) => {
242251 lookupErr = err
243252 }
244253
245- const res = { closerPeers : peers }
254+ /** @type {import('../query').QueryResult } */
255+ const res = {
256+ closerPeers : peers
257+ }
258+
259+ if ( rec && rec . value ) {
260+ vals . push ( {
261+ val : rec . value ,
262+ from : peer
263+ } )
246264
247- if ( ( rec && rec . value ) || lookupErr ) {
248- pathVals . push ( {
249- val : rec && rec . value ,
265+ queryResults ++
266+ } else if ( lookupErr ) {
267+ vals . push ( {
268+ err : lookupErr ,
250269 from : peer
251270 } )
271+
272+ queryResults ++
252273 }
253274
254275 // enough is enough
255- if ( pathVals . length >= pathSize ) {
276+ if ( queryResults >= pathSize ) {
256277 res . pathComplete = true
257278 }
258279
259280 return res
260281 }
261- } )
262282
263- let error
264- try {
265- await pTimeout ( query . run ( rtp ) , options . timeout )
266- } catch ( err ) {
267- error = err
283+ return disjointPathQuery
268284 }
269- query . stop ( )
270285
271- // combine vals from each path
272- vals = [ ] . concat . apply ( vals , paths ) . slice ( 0 , nvals )
286+ // we have peers, lets send the actual query to them
287+ const query = new Query ( dht , key , createQuery )
273288
274- if ( error && vals . length === 0 ) {
275- throw error
289+ try {
290+ await pTimeout ( query . run ( rtp ) , options . timeout )
291+ } catch ( err ) {
292+ if ( vals . length === 0 ) {
293+ throw err
294+ }
295+ } finally {
296+ query . stop ( )
276297 }
277298
278299 return vals
0 commit comments