Skip to content

Commit 52b3b1a

Browse files
authored
feat: add reprovide (#2785)
Adds a reprovide queue to kad-dht to republish provider records two hours before they would expire. Adds a function to the content routing interface to cancel the reprovide.
1 parent c4399dc commit 52b3b1a

38 files changed

+999
-556
lines changed

packages/integration-tests/test/fixtures/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ export class MockContentRouting implements ContentRouting {
166166
MockContentRouting.providers.set(cid.toString(), providers)
167167
}
168168

169+
async cancelReprovide (): Promise<void> {
170+
171+
}
172+
169173
async * findProviders (cid: CID<unknown, number, number, Version>, options?: AbortOptions | undefined): AsyncGenerator<PeerInfo, void, undefined> {
170174
yield * MockContentRouting.providers.get(cid.toString()) ?? []
171175
}

packages/interface/src/content-routing/index.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { RoutingOptions } from '../index.js'
1+
import type { AbortOptions, RoutingOptions } from '../index.js'
22
import type { PeerInfo } from '../peer-info/index.js'
33
import type { CID } from 'multiformats/cid'
44

@@ -45,6 +45,13 @@ export interface ContentRouting {
4545
*/
4646
provide(cid: CID, options?: RoutingOptions): Promise<void>
4747

48+
/**
49+
* If network peers need to be periodically reminded that the caller can
50+
* provide content corresponding to the passed CID, call this function to no
51+
* longer remind them.
52+
*/
53+
cancelReprovide (key: CID, options?: AbortOptions): Promise<void>
54+
4855
/**
4956
* Find the providers of the passed CID.
5057
*

packages/kad-dht/README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,42 @@ const peerInfo = await node.peerRouting.findPeer(peerId)
102102
console.info(peerInfo) // peer id, multiaddrs
103103
```
104104

105+
## Example - Connecting to both a LAN-only DHT and the IPFS Amino DHT
106+
107+
When using multiple DHTs, you should specify distinct datastore, metrics and
108+
log prefixes to ensure that data is kept separate for each instance.
109+
110+
```TypeScript
111+
import { kadDHT, removePublicAddressesMapper, removePrivateAddressesMapper } from '@libp2p/kad-dht'
112+
import { createLibp2p } from 'libp2p'
113+
import { peerIdFromString } from '@libp2p/peer-id'
114+
115+
const node = await createLibp2p({
116+
services: {
117+
lanDHT: kadDHT({
118+
protocol: '/ipfs/lan/kad/1.0.0',
119+
peerInfoMapper: removePublicAddressesMapper,
120+
clientMode: false,
121+
logPrefix: 'libp2p:dht-lan',
122+
datastorePrefix: '/dht-lan',
123+
metricsPrefix: 'libp2p_dht_lan'
124+
}),
125+
aminoDHT: kadDHT({
126+
protocol: '/ipfs/kad/1.0.0',
127+
peerInfoMapper: removePrivateAddressesMapper,
128+
logPrefix: 'libp2p:dht-amino',
129+
datastorePrefix: '/dht-amino',
130+
metricsPrefix: 'libp2p_dht_amino'
131+
})
132+
}
133+
})
134+
135+
const peerId = peerIdFromString('QmFoo')
136+
const peerInfo = await node.peerRouting.findPeer(peerId)
137+
138+
console.info(peerInfo) // peer id, multiaddrs
139+
```
140+
105141
# Install
106142

107143
```console

packages/kad-dht/package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
"@libp2p/utils": "^6.1.2",
6767
"@multiformats/multiaddr": "^12.2.3",
6868
"any-signal": "^4.1.1",
69-
"hashlru": "^2.3.0",
7069
"interface-datastore": "^8.3.0",
7170
"it-drain": "^3.0.7",
7271
"it-length": "^3.0.6",
@@ -77,10 +76,10 @@
7776
"it-pipe": "^3.0.1",
7877
"it-protobuf-stream": "^1.1.3",
7978
"it-take": "^3.0.5",
79+
"mortice": "^3.0.4",
8080
"multiformats": "^13.1.0",
8181
"p-defer": "^4.0.1",
8282
"p-event": "^6.0.1",
83-
"p-queue": "^8.0.1",
8483
"progress-events": "^1.0.0",
8584
"protons-runtime": "^5.4.0",
8685
"race-signal": "^1.0.2",
@@ -98,7 +97,6 @@
9897
"@types/which": "^3.0.3",
9998
"aegir": "^44.0.1",
10099
"datastore-core": "^10.0.0",
101-
"datastore-level": "^11.0.0",
102100
"delay": "^6.0.0",
103101
"execa": "^9.1.0",
104102
"it-all": "^3.0.6",

packages/kad-dht/src/constants.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@ export const MAX_RECORD_AGE = 36 * hour
1515

1616
export const PROTOCOL = '/ipfs/kad/1.0.0'
1717

18-
export const RECORD_KEY_PREFIX = '/dht/record'
18+
export const PROVIDERS_VALIDITY = 24 * hour
1919

20-
export const PROVIDER_KEY_PREFIX = '/dht/provider'
20+
export const PROVIDERS_CLEANUP_INTERVAL = hour
2121

22-
export const PROVIDERS_LRU_CACHE_SIZE = 256
22+
// Re-run the provide operation when the expiry of our provider records is within this amount
23+
export const REPROVIDE_THRESHOLD = 2 * hour
2324

24-
export const PROVIDERS_VALIDITY = 24 * hour
25+
// How many reprovide operations to run at once
26+
export const REPROVIDE_CONCURRENCY = 10
2527

26-
export const PROVIDERS_CLEANUP_INTERVAL = hour
28+
// How long to let the reprovide queue grow before we wait for capacity
29+
export const REPROVIDE_MAX_QUEUE_SIZE = 16_384
30+
31+
// How often to check if records need reproviding
32+
export const REPROVIDE_INTERVAL = hour
2733

2834
export const READ_MESSAGE_TIMEOUT = 10 * second
2935

packages/kad-dht/src/content-fetching/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ export class ContentFetching {
4141
private readonly peerRouting: PeerRouting
4242
private readonly queryManager: QueryManager
4343
private readonly network: Network
44+
private readonly datastorePrefix: string
4445

4546
constructor (components: KadDHTComponents, init: ContentFetchingInit) {
4647
const { validators, selectors, peerRouting, queryManager, network, logPrefix } = init
4748

4849
this.components = components
4950
this.log = components.logger.forComponent(`${logPrefix}:content-fetching`)
51+
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
5052
this.validators = validators
5153
this.selectors = selectors
5254
this.peerRouting = peerRouting
@@ -61,7 +63,7 @@ export class ContentFetching {
6163
async getLocal (key: Uint8Array): Promise<Libp2pRecord> {
6264
this.log('getLocal %b', key)
6365

64-
const dsKey = bufferToRecordKey(key)
66+
const dsKey = bufferToRecordKey(this.datastorePrefix, key)
6567

6668
this.log('fetching record for key %k', dsKey)
6769

@@ -92,7 +94,7 @@ export class ContentFetching {
9294
// correct ourself
9395
if (this.components.peerId.equals(from)) {
9496
try {
95-
const dsKey = bufferToRecordKey(key)
97+
const dsKey = bufferToRecordKey(this.datastorePrefix, key)
9698
this.log(`Storing corrected record for key ${dsKey.toString()}`)
9799
await this.components.datastore.put(dsKey, fixupRec.subarray())
98100
} catch (err: any) {
@@ -136,7 +138,7 @@ export class ContentFetching {
136138
const record = createPutRecord(key, value)
137139

138140
// store the record locally
139-
const dsKey = bufferToRecordKey(key)
141+
const dsKey = bufferToRecordKey(this.datastorePrefix, key)
140142
this.log(`storing record for key ${dsKey.toString()}`)
141143
await this.components.datastore.put(dsKey, record.subarray())
142144

packages/kad-dht/src/index.ts

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,47 @@
7878
*
7979
* console.info(peerInfo) // peer id, multiaddrs
8080
* ```
81+
*
82+
* @example Connecting to both a LAN-only DHT and the IPFS Amino DHT
83+
*
84+
* When using multiple DHTs, you should specify distinct datastore, metrics and
85+
* log prefixes to ensure that data is kept separate for each instance.
86+
*
87+
* ```TypeScript
88+
* import { kadDHT, removePublicAddressesMapper, removePrivateAddressesMapper } from '@libp2p/kad-dht'
89+
* import { createLibp2p } from 'libp2p'
90+
* import { peerIdFromString } from '@libp2p/peer-id'
91+
*
92+
* const node = await createLibp2p({
93+
* services: {
94+
* lanDHT: kadDHT({
95+
* protocol: '/ipfs/lan/kad/1.0.0',
96+
* peerInfoMapper: removePublicAddressesMapper,
97+
* clientMode: false,
98+
* logPrefix: 'libp2p:dht-lan',
99+
* datastorePrefix: '/dht-lan',
100+
* metricsPrefix: 'libp2p_dht_lan'
101+
* }),
102+
* aminoDHT: kadDHT({
103+
* protocol: '/ipfs/kad/1.0.0',
104+
* peerInfoMapper: removePrivateAddressesMapper,
105+
* logPrefix: 'libp2p:dht-amino',
106+
* datastorePrefix: '/dht-amino',
107+
* metricsPrefix: 'libp2p_dht_amino'
108+
* })
109+
* }
110+
* })
111+
*
112+
* const peerId = peerIdFromString('QmFoo')
113+
* const peerInfo = await node.peerRouting.findPeer(peerId)
114+
*
115+
* console.info(peerInfo) // peer id, multiaddrs
116+
* ```
81117
*/
82118

83119
import { KadDHT as KadDHTClass } from './kad-dht.js'
84120
import { MessageType } from './message/dht.js'
85121
import { removePrivateAddressesMapper, removePublicAddressesMapper, passthroughMapper } from './utils.js'
86-
import type { ProvidersInit } from './providers.js'
87122
import type { Libp2pEvents, ComponentLogger, TypedEventTarget, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions, PrivateKey } from '@libp2p/interface'
88123
import type { AddressManager, ConnectionManager, Registrar } from '@libp2p/interface-internal'
89124
import type { AdaptiveTimeoutInit } from '@libp2p/utils/src/adaptive-timeout.js'
@@ -251,6 +286,12 @@ export interface KadDHT {
251286
*/
252287
provide(key: CID, options?: RoutingOptions): AsyncIterable<QueryEvent>
253288

289+
/**
290+
* Provider records must be re-published every 24 hours - pass a previously
291+
* provided CID here to not re-publish a record for it any more
292+
*/
293+
cancelReprovide(key: CID): Promise<void>
294+
254295
/**
255296
* Store the passed value under the passed key on the DHT
256297
*/
@@ -299,7 +340,61 @@ export type Selectors = Record<string, SelectFn>
299340
*/
300341
export type Validators = Record<string, ValidateFn>
301342

302-
export type { ProvidersInit }
343+
export interface ProvidersInit {
344+
/**
345+
* @default 256
346+
*/
347+
cacheSize?: number
348+
/**
349+
* How often invalid records are cleaned. (in seconds)
350+
*
351+
* @default 5400
352+
*/
353+
cleanupInterval?: number
354+
/**
355+
* How long is a provider valid for. (in seconds)
356+
*
357+
* @default 86400
358+
*/
359+
provideValidity?: number
360+
}
361+
362+
export interface ReProvideInit {
363+
/**
364+
* How many re-provide operations to run simultaneously
365+
*
366+
* @default 10
367+
*/
368+
concurrency?: number
369+
370+
/**
371+
* How long to let the re-provide queue grow
372+
*
373+
* @default 16384
374+
*/
375+
maxQueueSize?: number
376+
377+
/**
378+
* How long before the record expiry to re-provide in ms
379+
*
380+
* @default 7200000
381+
*/
382+
threshold?: number
383+
384+
/**
385+
* How often to check which records need reproviding in ms
386+
*
387+
* @default 3600000
388+
*/
389+
interval?: number
390+
391+
/**
392+
* How long provider records are valid for in ms
393+
*
394+
* @default 86400000
395+
*/
396+
validity?: number
397+
}
303398

304399
export interface KadDHTInit {
305400
/**
@@ -396,6 +491,20 @@ export interface KadDHTInit {
396491
*/
397492
logPrefix?: string
398493

494+
/**
495+
* The datastore prefix to use
496+
*
497+
* @default "/dht"
498+
*/
499+
datastorePrefix?: string
500+
501+
/**
502+
* The metrics prefix to use
503+
*
504+
* @default "libp2p_kad_dht"
505+
*/
506+
metricsPrefix?: string
507+
399508
/**
400509
* Settings for how long to wait in ms when pinging DHT peers to decide if
401510
* they should be evicted from the routing table or not.
@@ -459,6 +568,11 @@ export interface KadDHTInit {
459568
*/
460569
providers?: ProvidersInit
461570

571+
/**
572+
* Initialization options for the Reprovider component
573+
*/
574+
reprovide?: ReProvideInit
575+
462576
/**
463577
* For every incoming and outgoing PeerInfo, override address configuration
464578
* with this filter.

0 commit comments

Comments
 (0)