Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"@multiformats/multiaddr": "^12.2.3",
"any-signal": "^4.1.1",
"interface-datastore": "^8.3.0",
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-length": "^3.0.6",
"it-length-prefixed": "^9.0.4",
Expand Down Expand Up @@ -99,7 +100,6 @@
"datastore-core": "^10.0.0",
"delay": "^6.0.0",
"execa": "^9.1.0",
"it-all": "^3.0.6",
"it-filter": "^3.1.0",
"it-last": "^3.0.6",
"it-pair": "^2.0.6",
Expand Down
3 changes: 2 additions & 1 deletion packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface ContentFetchingInit {
queryManager: QueryManager
network: Network
logPrefix: string
datastorePrefix: string
}

export class ContentFetching {
Expand All @@ -48,7 +49,7 @@ export class ContentFetching {

this.components = components
this.log = components.logger.forComponent(`${logPrefix}:content-fetching`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.validators = validators
this.selectors = selectors
this.peerRouting = peerRouting
Expand Down
6 changes: 4 additions & 2 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
peerRouting: this.peerRouting,
queryManager: this.queryManager,
network: this.network,
logPrefix
logPrefix,
datastorePrefix
})
this.contentRouting = new KADDHTContentRouting(components, {
network: this.network,
Expand All @@ -262,6 +263,7 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
validators: this.validators,
logPrefix,
metricsPrefix,
datastorePrefix,
peerInfoMapper: this.peerInfoMapper
})
this.topologyListener = new TopologyListener(components, {
Expand Down Expand Up @@ -315,7 +317,7 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka

await this.onPeerConnect(peerData)
}).catch(err => {
this.log.error('could not add %p to routing table', peerId, err)
this.log.error('could not add %p to routing table - %e', peerId, err)
})
})

Expand Down
6 changes: 1 addition & 5 deletions packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,10 @@ export class PeerRouting {
const requesterXor = uint8ArrayXor(closerThanKadId, keyKadId)

for (const peerId of ids) {
if (peerId.equals(closerThan)) {
continue
}

const peerKadId = await convertPeerId(peerId)
const peerXor = uint8ArrayXor(peerKadId, keyKadId)

// only include if peer isy closer than requester
// only include if peer is closer than requester
if (uint8ArrayXorCompare(peerXor, requesterXor) !== -1) {
continue
}
Expand Down
8 changes: 5 additions & 3 deletions packages/kad-dht/src/providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Providers {

constructor (components: ProvidersComponents, init: ProvidersInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:providers`)
this.datastorePrefix = `/${init.datastorePrefix}/provider`
this.datastorePrefix = `${init.datastorePrefix}/provider`
this.datastore = components.datastore
this.lock = init.lock
}
Expand Down Expand Up @@ -70,8 +70,9 @@ export class Providers {
const release = await this.lock.readLock()

try {
this.log('get providers for %s', cid)
this.log('get providers for %c', cid)
const provs = await this.loadProviders(cid)
this.log('got %d providers for %c', provs.size, cid)

return [...provs.keys()]
} finally {
Expand All @@ -94,8 +95,9 @@ export class Providers {
*/
private async loadProviders (cid: CID): Promise<PeerMap<Date>> {
const providers = new PeerMap<Date>()
const key = toProviderKey(this.datastorePrefix, cid)

for await (const entry of this.datastore.query({ prefix: toProviderKey(this.datastorePrefix, cid).toString() })) {
for await (const entry of this.datastore.query({ prefix: key.toString() })) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readProviderTime(entry.value))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/kad-dht/src/reprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
})
this.datastore = components.datastore
this.addressManager = components.addressManager
this.datastorePrefix = `/${init.datastorePrefix}/provider`
this.datastorePrefix = `${init.datastorePrefix}/provider`
this.reprovideThreshold = init.threshold ?? REPROVIDE_THRESHOLD
this.maxQueueSize = init.maxQueueSize ?? REPROVIDE_MAX_QUEUE_SIZE
this.validity = init.validity ?? PROVIDERS_VALIDITY
Expand Down
25 changes: 18 additions & 7 deletions packages/kad-dht/src/rpc/handlers/add-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import * as Digest from 'multiformats/hashes/digest'
import type { Message } from '../../message/dht.js'
import type { Providers } from '../../providers'
import type { DHTMessageHandler } from '../index.js'
import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface'
import type { ComponentLogger, Logger, PeerId, PeerStore } from '@libp2p/interface'

export interface AddProviderComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
}

Expand All @@ -18,12 +20,16 @@ export interface AddProviderHandlerInit {
}

export class AddProviderHandler implements DHTMessageHandler {
private readonly peerId: PeerId
private readonly providers: Providers
private readonly peerStore: PeerStore
private readonly log: Logger

constructor (components: AddProviderComponents, init: AddProviderHandlerInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:add-provider`)
this.peerId = components.peerId
this.providers = init.providers
this.peerStore = components.peerStore
}

async handle (peerId: PeerId, msg: Message): Promise<Message | undefined> {
Expand All @@ -43,12 +49,16 @@ export class AddProviderHandler implements DHTMessageHandler {
this.log.error('no providers found in message')
}

this.log('%p asked us to store provider record for for %c', peerId, cid)
this.log('%p asked us, %p to store provider record for for %c', peerId, this.peerId, cid)

await Promise.all(
msg.providers.map(async (pi) => {
const digest = Digest.decode(pi.id)
const providerId = peerIdFromMultihash(digest)
const providerMultiaddrs = pi.multiaddrs.map(buf => multiaddr(buf))

// Ignore providers not from the originator
if (!peerId.equals(pi.id)) {
if (!peerId.equals(providerId)) {
this.log('invalid provider peer %p from %p', pi.id, peerId)
return
}
Expand All @@ -58,11 +68,12 @@ export class AddProviderHandler implements DHTMessageHandler {
return
}

this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, pi.multiaddrs.map((m) => multiaddr(m).toString()))

const multihash = Digest.decode(pi.id)
this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, providerMultiaddrs)

await this.providers.addProvider(cid, peerIdFromMultihash(multihash))
await this.providers.addProvider(cid, providerId)
await this.peerStore.merge(providerId, {
multiaddrs: providerMultiaddrs
})
})
)

Expand Down
50 changes: 18 additions & 32 deletions packages/kad-dht/src/rpc/handlers/get-providers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { InvalidMessageError } from '@libp2p/interface'
import all from 'it-all'
import map from 'it-map'
import { CID } from 'multiformats/cid'
import { MessageType } from '../../message/dht.js'
import type { PeerInfoMapper } from '../../index.js'
Expand All @@ -17,11 +19,13 @@ export interface GetProvidersHandlerInit {
}

export interface GetProvidersHandlerComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
}

export class GetProvidersHandler implements DHTMessageHandler {
private readonly peerId: PeerId
private readonly peerRouting: PeerRouting
private readonly providers: Providers
private readonly peerStore: PeerStore
Expand All @@ -32,6 +36,7 @@ export class GetProvidersHandler implements DHTMessageHandler {
const { peerRouting, providers, logPrefix } = init

this.log = components.logger.forComponent(`${logPrefix}:rpc:handlers:get-providers`)
this.peerId = components.peerId
this.peerStore = components.peerStore
this.peerRouting = peerRouting
this.providers = providers
Expand All @@ -52,27 +57,33 @@ export class GetProvidersHandler implements DHTMessageHandler {

this.log('%p asking for providers for %s', peerId, cid)

const [peers, closer] = await Promise.all([
this.providers.getProviders(cid),
this.peerRouting.getCloserPeersOffline(msg.key, peerId)
const [providerPeers, closerPeers] = await Promise.all([
all(map(await this.providers.getProviders(cid), async (peerId) => {
const peer = await this.peerStore.get(peerId)
const info: PeerInfo = {
id: peer.id,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
}

return info
})),
this.peerRouting.getCloserPeersOffline(msg.key, this.peerId)
])

const providerPeers = await this._getPeers(peers)
const closerPeers = await this._getPeers(closer.map(({ id }) => id))
const response: Message = {
type: MessageType.GET_PROVIDERS,
key: msg.key,
clusterLevel: msg.clusterLevel,
closer: closerPeers
.map(this.peerInfoMapper)
.filter(({ multiaddrs }) => multiaddrs.length)
.filter(({ id, multiaddrs }) => multiaddrs.length > 0)
.map(peerInfo => ({
id: peerInfo.id.toMultihash().bytes,
multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes)
})),
providers: providerPeers
.map(this.peerInfoMapper)
.filter(({ multiaddrs }) => multiaddrs.length)
.filter(({ id, multiaddrs }) => multiaddrs.length > 0)
.map(peerInfo => ({
id: peerInfo.id.toMultihash().bytes,
multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes)
Expand All @@ -87,29 +98,4 @@ export class GetProvidersHandler implements DHTMessageHandler {
async _getAddresses (peerId: PeerId): Promise<Multiaddr[]> {
return []
}

async _getPeers (peerIds: PeerId[]): Promise<PeerInfo[]> {
const output: PeerInfo[] = []

for (const peerId of peerIds) {
try {
const peer = await this.peerStore.get(peerId)

const peerAfterFilter = this.peerInfoMapper({
id: peerId,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
})

if (peerAfterFilter.multiaddrs.length > 0) {
output.push(peerAfterFilter)
}
} catch (err: any) {
if (err.name !== 'NotFoundError') {
throw err
}
}
}

return output
}
}
3 changes: 2 additions & 1 deletion packages/kad-dht/src/rpc/handlers/get-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { Datastore } from 'interface-datastore'
export interface GetValueHandlerInit {
peerRouting: PeerRouting
logPrefix: string
datastorePrefix: string
}

export interface GetValueHandlerComponents {
Expand All @@ -32,7 +33,7 @@ export class GetValueHandler implements DHTMessageHandler {

constructor (components: GetValueHandlerComponents, init: GetValueHandlerInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:get-value`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.peerStore = components.peerStore
this.datastore = components.datastore
this.peerRouting = init.peerRouting
Expand Down
3 changes: 2 additions & 1 deletion packages/kad-dht/src/rpc/handlers/put-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { Datastore } from 'interface-datastore'
export interface PutValueHandlerInit {
validators: Validators
logPrefix: string
datastorePrefix: string
}

export interface PutValueHandlerComponents {
Expand All @@ -29,7 +30,7 @@ export class PutValueHandler implements DHTMessageHandler {

this.components = components
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:put-value`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.validators = validators
}

Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface RPCInit {
validators: Validators
logPrefix: string
metricsPrefix: string
datastorePrefix: string
peerInfoMapper: PeerInfoMapper
}

Expand Down
Loading
Loading