Skip to content

Commit 27b2fa6

Browse files
authored
fix: track closest peers separately from main routing table (#2748)
The routing table is a balance trie where the path to the leaf node storing the contact is derived from the prefix of the kad id of the contact. This makes it great for starting a query because we can quickly find contacts in the kad-vicinity of the target, but it's less good for knowing peers that are in our kad-vicinity, since the bits that make us kad-close to another peer might not be in the prefix. Instead, use a peer distance list that we update whenever a peer successfully completes a `PING` operation. Periodically check this list and tag the closes peers with `KEEP_ALIVE` so we maintain connections to them, which will ensure we propagate changes in our PeerInfo to those peers most likely to answer `FIND_PEER` queries for our data.
1 parent 661d658 commit 27b2fa6

File tree

10 files changed

+308
-213
lines changed

10 files changed

+308
-213
lines changed

packages/kad-dht/src/peer-list/peer-distance-list.ts renamed to packages/kad-dht/src/peer-distance-list.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
22
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
3-
import { convertPeerId } from '../utils.js'
3+
import { convertPeerId } from './utils.js'
44
import type { PeerId, PeerInfo } from '@libp2p/interface'
55

66
interface PeerDistance {

packages/kad-dht/src/peer-list/index.ts

Lines changed: 0 additions & 54 deletions
This file was deleted.

packages/kad-dht/src/peer-routing/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { Libp2pRecord } from '@libp2p/record'
55
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
66
import { QueryError, InvalidRecordError } from '../errors.js'
77
import { MessageType } from '../message/dht.js'
8-
import { PeerDistanceList } from '../peer-list/peer-distance-list.js'
8+
import { PeerDistanceList } from '../peer-distance-list.js'
99
import {
1010
queryErrorEvent,
1111
finalPeerEvent,
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { KEEP_ALIVE } from '@libp2p/interface'
2+
import { PeerSet } from '@libp2p/peer-collections'
3+
import { PeerDistanceList } from '../peer-distance-list.js'
4+
import { convertPeerId } from '../utils.js'
5+
import type { RoutingTable } from './index.js'
6+
import type { ComponentLogger, Logger, Metrics, PeerId, PeerStore, Startable } from '@libp2p/interface'
7+
8+
export const PEER_SET_SIZE = 20
9+
export const REFRESH_INTERVAL = 5000
10+
export const KAD_CLOSE_TAG_NAME = 'kad-close'
11+
export const KAD_CLOSE_TAG_VALUE = 50
12+
13+
export interface ClosestPeersInit {
14+
logPrefix: string
15+
routingTable: RoutingTable
16+
peerSetSize?: number
17+
refreshInterval?: number
18+
closeTagName?: string
19+
closeTagValue?: number
20+
}
21+
22+
export interface ClosestPeersComponents {
23+
peerId: PeerId
24+
peerStore: PeerStore
25+
metrics?: Metrics
26+
logger: ComponentLogger
27+
}
28+
29+
/**
30+
* Contains a list of the kad-closest peers encountered on the network.
31+
*
32+
* Once every few seconds, if the list has changed, it tags the closest peers.
33+
*/
34+
export class ClosestPeers implements Startable {
35+
private readonly routingTable: RoutingTable
36+
private readonly components: ClosestPeersComponents
37+
private closestPeers: PeerSet
38+
private newPeers?: PeerDistanceList
39+
private readonly refreshInterval: number
40+
private readonly peerSetSize: number
41+
private timeout?: ReturnType<typeof setTimeout>
42+
private readonly closeTagName: string
43+
private readonly closeTagValue: number
44+
private readonly log: Logger
45+
46+
constructor (components: ClosestPeersComponents, init: ClosestPeersInit) {
47+
this.components = components
48+
this.log = components.logger.forComponent(`${init.logPrefix}:routing-table`)
49+
this.routingTable = init.routingTable
50+
this.refreshInterval = init.refreshInterval ?? REFRESH_INTERVAL
51+
this.peerSetSize = init.peerSetSize ?? PEER_SET_SIZE
52+
this.closeTagName = init.closeTagName ?? KAD_CLOSE_TAG_NAME
53+
this.closeTagValue = init.closeTagValue ?? KAD_CLOSE_TAG_VALUE
54+
55+
this.closestPeers = new PeerSet()
56+
this.onPeerPing = this.onPeerPing.bind(this)
57+
}
58+
59+
async start (): Promise<void> {
60+
const targetKadId = await convertPeerId(this.components.peerId)
61+
this.newPeers = new PeerDistanceList(targetKadId, this.peerSetSize)
62+
this.routingTable.addEventListener('peer:ping', this.onPeerPing)
63+
64+
this.timeout = setInterval(() => {
65+
this.updatePeerTags()
66+
.catch(err => {
67+
this.log.error('error updating peer tags - %e', err)
68+
})
69+
}, this.refreshInterval)
70+
}
71+
72+
stop (): void {
73+
this.routingTable.removeEventListener('peer:ping', this.onPeerPing)
74+
clearTimeout(this.timeout)
75+
}
76+
77+
onPeerPing (event: CustomEvent<PeerId>): void {
78+
this.newPeers?.add({ id: event.detail, multiaddrs: [] })
79+
.catch(err => {
80+
this.log.error('error adding peer to distance list - %e', err)
81+
})
82+
}
83+
84+
async updatePeerTags (): Promise<void> {
85+
const newClosest = new PeerSet(this.newPeers?.peers.map(peer => peer.id))
86+
const added = newClosest.difference(this.closestPeers)
87+
const removed = this.closestPeers.difference(newClosest)
88+
this.closestPeers = newClosest
89+
90+
await Promise.all([
91+
...[...added].map(async peerId => {
92+
await this.components.peerStore.merge(peerId, {
93+
tags: {
94+
[this.closeTagName]: {
95+
value: this.closeTagValue
96+
},
97+
[KEEP_ALIVE]: {
98+
value: 1
99+
}
100+
}
101+
})
102+
}),
103+
...[...removed].map(async peerId => {
104+
await this.components.peerStore.merge(peerId, {
105+
tags: {
106+
[this.closeTagName]: undefined,
107+
[KEEP_ALIVE]: undefined
108+
}
109+
})
110+
})
111+
])
112+
}
113+
}

packages/kad-dht/src/routing-table/index.ts

Lines changed: 32 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
1-
import { KEEP_ALIVE, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
1+
import { TypedEventEmitter, setMaxListeners, start, stop } from '@libp2p/interface'
22
import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
33
import { PeerQueue } from '@libp2p/utils/peer-queue'
44
import { anySignal } from 'any-signal'
55
import parallel from 'it-parallel'
66
import { EventTypes } from '../index.js'
77
import { MessageType } from '../message/dht.js'
88
import * as utils from '../utils.js'
9+
import { ClosestPeers } from './closest-peers.js'
910
import { KBucket, isLeafBucket } from './k-bucket.js'
1011
import type { Bucket, LeafBucket, Peer } from './k-bucket.js'
1112
import type { Network } from '../network.js'
12-
import type { AbortOptions, ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream, TagOptions } from '@libp2p/interface'
13+
import type { AbortOptions, ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface'
1314
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'
1415

15-
export const KAD_CLOSE_TAG_NAME = 'kad-close'
16-
export const KAD_CLOSE_TAG_VALUE = 50
1716
export const KBUCKET_SIZE = 20
18-
export const PREFIX_LENGTH = 7
17+
export const PREFIX_LENGTH = 8
1918
export const PING_NEW_CONTACT_TIMEOUT = 2000
2019
export const PING_NEW_CONTACT_CONCURRENCY = 20
2120
export const PING_NEW_CONTACT_MAX_QUEUE_SIZE = 100
@@ -50,6 +49,8 @@ export interface RoutingTableInit {
5049
populateFromDatastoreOnStart?: boolean
5150
populateFromDatastoreLimit?: number
5251
lastPingThreshold?: number
52+
closestPeerSetSize?: number
53+
closestPeerSetRefreshInterval?: number
5354
}
5455

5556
export interface RoutingTableComponents {
@@ -62,6 +63,7 @@ export interface RoutingTableComponents {
6263
export interface RoutingTableEvents {
6364
'peer:add': CustomEvent<PeerId>
6465
'peer:remove': CustomEvent<PeerId>
66+
'peer:ping': CustomEvent<PeerId>
6567
}
6668

6769
/**
@@ -71,6 +73,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
7173
public kBucketSize: number
7274
public kb: KBucket
7375
public network: Network
76+
private readonly closestPeerTagger: ClosestPeers
7477
private readonly log: Logger
7578
private readonly components: RoutingTableComponents
7679
private running: boolean
@@ -83,8 +86,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
8386
private readonly protocol: string
8487
private readonly peerTagName: string
8588
private readonly peerTagValue: number
86-
private readonly closeTagName: string
87-
private readonly closeTagValue: number
8889
private readonly metrics?: {
8990
routingTableSize: Metric
9091
routingTableKadBucketTotal: Metric
@@ -106,13 +107,10 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
106107
this.network = init.network
107108
this.peerTagName = init.peerTagName ?? KAD_PEER_TAG_NAME
108109
this.peerTagValue = init.peerTagValue ?? KAD_PEER_TAG_VALUE
109-
this.closeTagName = init.closeTagName ?? KAD_CLOSE_TAG_NAME
110-
this.closeTagValue = init.closeTagValue ?? KAD_CLOSE_TAG_VALUE
111110
this.pingOldContacts = this.pingOldContacts.bind(this)
112111
this.verifyNewContact = this.verifyNewContact.bind(this)
113112
this.peerAdded = this.peerAdded.bind(this)
114113
this.peerRemoved = this.peerRemoved.bind(this)
115-
this.peerMoved = this.peerMoved.bind(this)
116114
this.populateFromDatastoreOnStart = init.populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START
117115
this.populateFromDatastoreLimit = init.populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT
118116

@@ -149,8 +147,16 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
149147
ping: this.pingOldContacts,
150148
verify: this.verifyNewContact,
151149
onAdd: this.peerAdded,
152-
onRemove: this.peerRemoved,
153-
onMove: this.peerMoved
150+
onRemove: this.peerRemoved
151+
})
152+
153+
this.closestPeerTagger = new ClosestPeers(this.components, {
154+
logPrefix: init.logPrefix,
155+
routingTable: this,
156+
peerSetSize: init.closestPeerSetSize,
157+
refreshInterval: init.closestPeerSetRefreshInterval,
158+
closeTagName: init.closeTagName,
159+
closeTagValue: init.closeTagValue
154160
})
155161

156162
if (this.components.metrics != null) {
@@ -173,6 +179,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
173179
async start (): Promise<void> {
174180
this.running = true
175181

182+
await start(this.closestPeerTagger)
176183
await this.kb.addSelfPeer(this.components.peerId)
177184
}
178185

@@ -205,9 +212,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
205212
this.log('failed to add peer %p to routing table, removing kad-dht peer tags - %e')
206213
await this.components.peerStore.merge(peer.id, {
207214
tags: {
208-
[this.closeTagName]: undefined,
209-
[this.peerTagName]: undefined,
210-
[KEEP_ALIVE]: undefined
215+
[this.peerTagName]: undefined
211216
}
212217
})
213218
}
@@ -222,29 +227,19 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
222227

223228
async stop (): Promise<void> {
224229
this.running = false
230+
await stop(this.closestPeerTagger)
225231
this.pingOldContactQueue.abort()
226232
this.pingNewContactQueue.abort()
227233
}
228234

229235
private async peerAdded (peer: Peer, bucket: LeafBucket): Promise<void> {
230236
if (!this.components.peerId.equals(peer.peerId)) {
231-
const tags: Record<string, TagOptions | undefined> = {
232-
[this.peerTagName]: {
233-
value: this.peerTagValue
234-
}
235-
}
236-
237-
if (bucket.containsSelf === true) {
238-
tags[this.closeTagName] = {
239-
value: this.closeTagValue
240-
}
241-
tags[KEEP_ALIVE] = {
242-
value: 1
243-
}
244-
}
245-
246237
await this.components.peerStore.merge(peer.peerId, {
247-
tags
238+
tags: {
239+
[this.peerTagName]: {
240+
value: this.peerTagValue
241+
}
242+
}
248243
})
249244
}
250245

@@ -257,9 +252,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
257252
if (!this.components.peerId.equals(peer.peerId)) {
258253
await this.components.peerStore.merge(peer.peerId, {
259254
tags: {
260-
[this.closeTagName]: undefined,
261-
[this.peerTagName]: undefined,
262-
[KEEP_ALIVE]: undefined
255+
[this.peerTagName]: undefined
263256
}
264257
})
265258
}
@@ -269,30 +262,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
269262
this.safeDispatchEvent('peer:remove', { detail: peer.peerId })
270263
}
271264

272-
private async peerMoved (peer: Peer, oldBucket: LeafBucket, newBucket: LeafBucket): Promise<void> {
273-
if (this.components.peerId.equals(peer.peerId)) {
274-
return
275-
}
276-
277-
const tags: Record<string, TagOptions | undefined> = {
278-
[this.closeTagName]: undefined,
279-
[KEEP_ALIVE]: undefined
280-
}
281-
282-
if (newBucket.containsSelf === true) {
283-
tags[this.closeTagName] = {
284-
value: this.closeTagValue
285-
}
286-
tags[KEEP_ALIVE] = {
287-
value: 1
288-
}
289-
}
290-
291-
await this.components.peerStore.merge(peer.peerId, {
292-
tags
293-
})
294-
}
295-
296265
/**
297266
* Called on the `ping` event from `k-bucket` when a bucket is full
298267
* and cannot split.
@@ -410,6 +379,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
410379
if (event.type === EventTypes.PEER_RESPONSE) {
411380
if (event.messageType === MessageType.PING) {
412381
this.log('contact %p ping ok', contact.peerId)
382+
383+
this.safeDispatchEvent('peer:ping', {
384+
detail: contact.peerId
385+
})
386+
413387
return true
414388
}
415389

0 commit comments

Comments
 (0)