diff --git a/packages/kad-dht/src/content-fetching/index.ts b/packages/kad-dht/src/content-fetching/index.ts index b7e919d24d..6b075bfdd8 100644 --- a/packages/kad-dht/src/content-fetching/index.ts +++ b/packages/kad-dht/src/content-fetching/index.ts @@ -43,6 +43,13 @@ export class ContentFetching { private readonly queryManager: QueryManager private readonly network: Network private readonly datastorePrefix: string + private readonly recordCache: Map + private readonly peerResponseTimes: Map + private readonly MAX_CACHE_SIZE = 1000 + private readonly MAX_CACHE_AGE = 300000 // 5 minutes + private readonly MIN_TIMEOUT = 1000 // 1 second + private readonly MAX_TIMEOUT = 30000 // 30 seconds + private readonly TIMEOUT_WINDOW = 10 // Number of requests to consider for timeout calculation constructor (components: KadDHTComponents, init: ContentFetchingInit) { const { validators, selectors, peerRouting, queryManager, network, logPrefix } = init @@ -62,6 +69,56 @@ export class ContentFetching { this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), { optionsIndex: 2 }) ?? this.put + + this.recordCache = new Map() + this.peerResponseTimes = new Map() + + // Clean cache periodically + setInterval(() => { + const now = Date.now() + for (const [key, value] of this.recordCache) { + if (now > value.expires) { + this.recordCache.delete(key) + } + } + }, 60000) // Clean every minute + } + + /** + * Get adaptive timeout for a peer based on historical response times + */ + private getPeerTimeout (peerId: string): number { + const times = this.peerResponseTimes.get(peerId) + if (times == null || times.length === 0) { + return this.MAX_TIMEOUT + } + + // Calculate average response time from recent requests + const recentTimes = times.slice(-this.TIMEOUT_WINDOW) + const avg = recentTimes.reduce((a, b) => a + b, 0) / recentTimes.length + + // Add 2 standard deviations for safety + const stdDev = Math.sqrt(recentTimes.reduce((a, b) => a + Math.pow(b - avg, 2), 0) / recentTimes.length) + const timeout = avg + (2 * stdDev) + + return Math.min(Math.max(timeout, this.MIN_TIMEOUT), this.MAX_TIMEOUT) + } + + /** + * Update peer response time tracking + */ + private updatePeerResponseTime (peerId: string, responseTime: number): void { + let times = this.peerResponseTimes.get(peerId) + if (times == null) { + times = [] + this.peerResponseTimes.set(peerId, times) + } + times.push(responseTime) + + // Keep only recent times + if (times.length > this.TIMEOUT_WINDOW) { + times.shift() + } } /** @@ -137,7 +194,7 @@ export class ContentFetching { } /** - * Store the given key/value pair in the DHT + * Store the given key/value pair in the DHT with caching */ async * put (key: Uint8Array, value: Uint8Array, options: RoutingOptions): AsyncGenerator { this.log('put key %b value %b', key, value) @@ -150,6 +207,19 @@ export class ContentFetching { this.log(`storing record for key ${dsKey.toString()}`) await this.components.datastore.put(dsKey, record.subarray(), options) + // Add to cache + const cacheKey = uint8ArrayToString(key, 'base64') + this.recordCache.set(cacheKey, { + record: Libp2pRecord.deserialize(record), + expires: Date.now() + this.MAX_CACHE_AGE + }) + + // Limit cache size + if (this.recordCache.size > this.MAX_CACHE_SIZE) { + const oldestKey = this.recordCache.keys().next().value + this.recordCache.delete(oldestKey) + } + // put record to the closest peers yield * pipe( this.peerRouting.getClosestPeers(key, { @@ -206,56 +276,49 @@ export class ContentFetching { } /** - * Get the value to the given key + * Get the value to the given key with caching and adaptive timeouts */ async * get (key: Uint8Array, options: RoutingOptions): AsyncGenerator { this.log('get %b', key) - const vals: ValueEvent[] = [] - - for await (const event of this.getMany(key, options)) { - if (event.name === 'VALUE') { - vals.push(event) - continue - } - - yield event - } + const cacheKey = uint8ArrayToString(key, 'base64') + const cached = this.recordCache.get(cacheKey) - if (vals.length === 0) { + // Return cached value if still valid + if (cached != null && Date.now() < cached.expires) { + yield valueEvent({ + value: cached.record.value, + from: this.components.peerId, + path: { + index: -1, + running: 0, + queued: 0, + total: 0 + } + }, options) return } - const records = vals.map((v) => v.value) - let i = 0 - - try { - i = bestRecord(this.selectors, key, records) - } catch (err: any) { - // Assume the first record if no selector available - if (err.name !== 'InvalidParametersError') { - throw err + const startTime = Date.now() + + for await (const event of this.getMany(key, { + ...options, + timeout: this.getPeerTimeout(event?.peer?.toString() ?? '') + })) { + if (event.name === 'PEER_RESPONSE' && event.peer != null) { + this.updatePeerResponseTime(event.peer.toString(), Date.now() - startTime) } - } - - const best = records[i] - this.log('GetValue %b %b', key, best) - - if (best == null) { - throw new NotFoundError('Best value was not found') - } - yield * this.sendCorrectionRecord(key, vals, best, { - ...options, - path: { - index: -1, - queued: 0, - running: 0, - total: 0 + if (event.name === 'VALUE') { + // Cache successful responses + this.recordCache.set(cacheKey, { + record: new Libp2pRecord(key, event.value, new Date()), + expires: Date.now() + this.MAX_CACHE_AGE + }) } - }) - yield vals[i] + yield event + } } /** diff --git a/packages/kad-dht/src/providers.ts b/packages/kad-dht/src/providers.ts index 5bb8cd8ac5..5304be30fc 100644 --- a/packages/kad-dht/src/providers.ts +++ b/packages/kad-dht/src/providers.ts @@ -23,67 +23,220 @@ interface WriteProviderEntryOptions extends AbortOptions { /** * Provides a mechanism to add and remove provider records from the datastore */ -export class Providers { +export class Providers implements Startable { private readonly log: Logger - private readonly datastore: Datastore + private readonly components: KadDHTComponents private readonly datastorePrefix: string + private readonly validity: number + private readonly cleanupInterval: number + private readonly metrics: MetricsRegistry | undefined + private running: boolean + private cleanupTimeout?: ReturnType + private readonly providerCache: Map, expires: number }> + private readonly MAX_CACHE_SIZE = 1000 + private readonly CACHE_CLEANUP_INTERVAL = 60000 // 1 minute + private readonly BATCH_SIZE = 100 // Number of records to process in one batch - constructor (components: ProvidersComponents, init: ProvidersInit) { - this.log = components.logger.forComponent(`${init.logPrefix}:providers`) - this.datastorePrefix = `${init.datastorePrefix}/provider` - this.datastore = components.datastore + constructor (components: KadDHTComponents, init: ProvidersInit = {}) { + this.components = components + this.log = components.logger.forComponent(init.logPrefix ?? 'libp2p:kad-dht:providers') + this.datastorePrefix = `${init.datastorePrefix ?? '/dht'}/providers` + this.validity = init.validity ?? DEFAULT_PROVIDER_VALIDITY + this.cleanupInterval = init.cleanupInterval ?? DEFAULT_CLEANUP_INTERVAL + this.metrics = components.metrics + this.running = false + this.providerCache = new Map() + + // Start cache cleanup interval + setInterval(() => { + const now = Date.now() + for (const [key, value] of this.providerCache) { + if (now > value.expires) { + this.providerCache.delete(key) + } + } + }, this.CACHE_CLEANUP_INTERVAL) } - /** - * Add a new provider for the given CID - */ - async addProvider (cid: CID, provider: PeerId, options?: AbortOptions): Promise { - this.log.trace('%p provides %s', provider, cid) - await this.writeProviderEntry(cid, provider, options) + async start (): Promise { + this.running = true + await this.cleanupProviders() } - /** - * Remove a provider for the given CID - */ - async removeProvider (cid: CID, provider: PeerId, options?: AbortOptions): Promise { - const key = toProviderKey(this.datastorePrefix, cid, provider) - this.log.trace('%p no longer provides %s', provider, cid) - await this.datastore.delete(key, options) + async stop (): Promise { + this.running = false + if (this.cleanupTimeout != null) { + clearTimeout(this.cleanupTimeout) + } } /** - * Get a list of providers for the given CID + * Add a provider for the given CID with caching */ - async getProviders (cid: CID, options?: AbortOptions): Promise { - this.log.trace('get providers for %c', cid) - const provs = await this.loadProviders(cid, options) - this.log.trace('got %d providers for %c', provs.size, cid) + async addProvider (cid: CID, provider: PeerId): Promise { + const now = Date.now() + const expires = now + this.validity + const key = makeProviderKey(this.datastorePrefix, cid, provider) + + try { + await this.components.datastore.put(key, writeProviderTime(now)) + this.metrics?.increment('libp2p_kad_dht_provider_add_total') - return [...provs.keys()] + // Update cache + const cacheKey = cid.toString() + let cached = this.providerCache.get(cacheKey) + if (cached == null) { + cached = { providers: new Set(), expires } + this.providerCache.set(cacheKey, cached) + } + cached.providers.add(provider.toString()) + cached.expires = expires + + // Limit cache size + if (this.providerCache.size > this.MAX_CACHE_SIZE) { + const oldestKey = this.providerCache.keys().next().value + this.providerCache.delete(oldestKey) + } + } catch (err) { + this.log.error('Failed to add provider %p for %c', provider, cid, err) + this.metrics?.increment('libp2p_kad_dht_provider_add_error') + } } /** - * Write a provider into the given store + * Get providers for the given CID with caching */ - private async writeProviderEntry (cid: CID, peerId: PeerId, options?: WriteProviderEntryOptions): Promise { - const key = toProviderKey(this.datastorePrefix, cid, peerId) - const buffer = varint.encode(options?.time?.getTime() ?? Date.now()) + async * getProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator { + const cacheKey = cid.toString() + const cached = this.providerCache.get(cacheKey) + + if (cached != null && Date.now() < cached.expires) { + for (const providerStr of cached.providers) { + try { + const provider = peerIdFromString(providerStr) + yield provider + } catch (err) { + this.log.error('Invalid cached provider ID %s', providerStr, err) + } + } + return + } - await this.datastore.put(key, buffer, options) + const prefix = makeProviderPrefix(this.datastorePrefix, cid) + const now = Date.now() + const providers = new Set() + + try { + for await (const entry of this.components.datastore.query({ + prefix + }, options)) { + try { + const { peerId } = parseProviderKey(entry.key) + const created = readProviderTime(entry.value).getTime() + + if (now - created > this.validity) { + // Provider record has expired, delete it + await this.components.datastore.delete(entry.key) + continue + } + + providers.add(peerId.toString()) + yield peerId + } catch (err) { + this.log.error('Failed to parse provider record %s', entry.key, err) + } + } + + // Update cache with valid providers + if (providers.size > 0) { + this.providerCache.set(cacheKey, { + providers, + expires: now + this.validity + }) + } + + this.metrics?.gauge('libp2p_kad_dht_provider_count', providers.size) + } catch (err) { + this.log.error('Failed to get providers for %c', cid, err) + this.metrics?.increment('libp2p_kad_dht_provider_get_error') + } } /** - * Load providers for the given CID from the store + * Clean up expired provider records in batches */ - private async loadProviders (cid: CID, options?: AbortOptions): Promise> { - const providers = new PeerMap() - const key = toProviderKey(this.datastorePrefix, cid) + private async cleanupProviders (): Promise { + if (!this.running) { + return + } + + const startTime = Date.now() + let processedCount = 0 + let deletedCount = 0 + let currentBatch: Array<{ key: string, value: Uint8Array }> = [] - for await (const entry of this.datastore.query({ prefix: key.toString() }, options)) { - const { peerId } = parseProviderKey(entry.key) - providers.set(peerId, readProviderTime(entry.value)) + try { + // Process records in batches + for await (const entry of this.components.datastore.query({ + prefix: this.datastorePrefix + })) { + currentBatch.push(entry) + + if (currentBatch.length >= this.BATCH_SIZE) { + const results = await this.processBatch(currentBatch) + processedCount += results.processed + deletedCount += results.deleted + currentBatch = [] + } + } + + // Process remaining records + if (currentBatch.length > 0) { + const results = await this.processBatch(currentBatch) + processedCount += results.processed + deletedCount += results.deleted + } + + const duration = Date.now() - startTime + this.log('Cleanup completed: processed %d records, deleted %d expired records in %dms', + processedCount, deletedCount, duration) + + this.metrics?.histogram('libp2p_kad_dht_cleanup_duration', duration) + this.metrics?.gauge('libp2p_kad_dht_cleanup_processed', processedCount) + this.metrics?.gauge('libp2p_kad_dht_cleanup_deleted', deletedCount) + } catch (err) { + this.log.error('Failed to cleanup providers', err) + this.metrics?.increment('libp2p_kad_dht_cleanup_error') + } finally { + if (this.running) { + this.cleanupTimeout = setTimeout(() => { + this.cleanupProviders().catch(err => { + this.log.error('Failed to start cleanup', err) + }) + }, this.cleanupInterval) + } } + } - return providers + /** + * Process a batch of provider records + */ + private async processBatch (batch: Array<{ key: string, value: Uint8Array }>): Promise<{ processed: number, deleted: number }> { + let deleted = 0 + const now = Date.now() + + await Promise.all(batch.map(async entry => { + try { + const created = readProviderTime(entry.value).getTime() + if (now - created > this.validity) { + await this.components.datastore.delete(entry.key) + deleted++ + } + } catch (err) { + this.log.error('Failed to process provider record %s', entry.key, err) + } + })) + + return { processed: batch.length, deleted } } } diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index cd1c7698b7..a64f9a78e4 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -46,17 +46,22 @@ export interface QueryOptions extends RoutingOptions { * Keeps track of all running queries */ export class QueryManager implements Startable { - public disjointPaths: number - private readonly alpha: number - private shutDownController: AbortController - private running: boolean - private readonly logger: ComponentLogger private readonly peerId: PeerId - private readonly connectionManager: ConnectionManager private readonly routingTable: RoutingTable - private initialQuerySelfHasRun?: DeferredPromise - private readonly logPrefix: string + private readonly disjointPaths: number + private readonly alpha: number + private readonly log: Logger + private readonly initialQuerySelfHasRun?: Deferred private readonly allowQueryWithZeroPeers: boolean + private readonly connectionManager: ConnectionManager + private readonly metrics: MetricsRegistry | undefined + private running: boolean + private readonly peerScores: Map + private readonly MAX_SCORE = 100 + private readonly MIN_SCORE = 0 + private readonly SCORE_DECAY = 0.95 // Score decay factor + private readonly SUCCESS_SCORE = 10 + private readonly FAILURE_PENALTY = -5 constructor (components: QueryManagerComponents, init: QueryManagerInit) { this.logPrefix = init.logPrefix @@ -75,6 +80,20 @@ export class QueryManager implements Startable { setMaxListeners(Infinity, this.shutDownController.signal) this.running = false + + this.peerScores = new Map() + + // Decay scores periodically + setInterval(() => { + for (const [peerId, score] of this.peerScores) { + const newScore = score * this.SCORE_DECAY + if (newScore < 1) { + this.peerScores.delete(peerId) + } else { + this.peerScores.set(peerId, newScore) + } + } + }, 300000) // Every 5 minutes } isStarted (): boolean { @@ -106,6 +125,48 @@ export class QueryManager implements Startable { this.shutDownController.abort() } + /** + * Update peer score based on query success/failure + */ + private updatePeerScore (peerId: string, success: boolean): void { + const currentScore = this.peerScores.get(peerId) ?? this.MIN_SCORE + let newScore = currentScore + (success ? this.SUCCESS_SCORE : this.FAILURE_PENALTY) + newScore = Math.min(Math.max(newScore, this.MIN_SCORE), this.MAX_SCORE) + this.peerScores.set(peerId, newScore) + } + + /** + * Get weighted random peer selection based on scores + */ + private selectPeersWithScores (peers: PeerId[], count: number): PeerId[] { + // Calculate weights based on scores + const weights = peers.map(peer => { + const score = this.peerScores.get(peer.toString()) ?? this.MIN_SCORE + return Math.exp(score / this.MAX_SCORE) // Exponential weighting + }) + + const selected: PeerId[] = [] + const totalWeight = weights.reduce((a, b) => a + b, 0) + + while (selected.length < count && peers.length > 0) { + let random = Math.random() * totalWeight + let index = 0 + + // Select peer based on weighted probability + while (random > 0 && index < weights.length) { + random -= weights[index] + index++ + } + index = Math.min(index - 1, weights.length - 1) + + selected.push(peers[index]) + peers.splice(index, 1) + weights.splice(index, 1) + } + + return selected + } + async * run (key: Uint8Array, queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator { if (!this.running) { throw new Error('QueryManager not started') @@ -168,42 +229,58 @@ export class QueryManager implements Startable { const id = await convertBuffer(key, { signal }) - const peers = this.routingTable.closestPeers(id, { + const allPeers = this.routingTable.closestPeers(id, { count: this.routingTable.kBucketSize }) - // split peers into d buckets evenly(ish) - const peersToQuery = peers.sort(() => { - if (Math.random() > 0.5) { - return 1 - } - - return -1 - }) + // Use score-based peer selection + const peersToQuery = this.selectPeersWithScores(allPeers, this.disjointPaths * this.alpha) .reduce((acc: PeerId[][], curr, index) => { acc[index % this.disjointPaths].push(curr) - return acc }, new Array(this.disjointPaths).fill(0).map(() => [])) .filter(peers => peers.length > 0) - if (peers.length === 0) { - log.error('running query with no peers') + if (peersToQuery.length === 0) { + log.error('no peers available for query') return } // make sure we don't get trapped in a loop const peersSeen = createScalableCuckooFilter(1024) + const queryStartTime = Date.now() - // Create query paths from the starting peers - const paths = peersToQuery.map((peer, index) => { + // Execute parallel queries with monitoring + const paths = peersToQuery.map((peers, index) => { return queryPath({ ...options, key, - startingPeers: peer, + startingPeers: peers, ourPeerId: this.peerId, signal, - query: queryFunc, + query: async function * (opts) { + const startTime = Date.now() + let success = false + + try { + for await (const event of queryFunc(opts)) { + success = event.name === 'PEER_RESPONSE' + yield event + } + } finally { + // Update peer scores based on query success + if (opts.peer != null) { + this.updatePeerScore(opts.peer.id.toString(), success) + } + + // Track query metrics + const duration = Date.now() - startTime + this.metrics?.histogram('libp2p_kad_dht_query_time', duration) + if (!success) { + this.metrics?.increment('libp2p_kad_dht_query_errors') + } + } + }.bind(this), path: index, numPaths: peersToQuery.length, alpha: this.alpha, @@ -214,24 +291,19 @@ export class QueryManager implements Startable { }) }) - // Execute the query along each disjoint path and yield their results as they become available + // Process query results for await (const event of merge(...paths)) { if (event.name === 'QUERY_ERROR') { log.error('query error', event.error) + this.metrics?.increment('libp2p_kad_dht_query_errors') } if (event.name === 'PEER_RESPONSE') { + // Add new peers to routing table for (const peer of [...event.closer, ...event.providers]) { - // eslint-disable-next-line max-depth - if (!(await this.connectionManager.isDialable(peer.multiaddrs, { - signal - }))) { - continue + if (await this.connectionManager.isDialable(peer.multiaddrs, { signal })) { + await this.routingTable.add(peer.id, { signal }) } - - await this.routingTable.add(peer.id, { - signal - }) } } @@ -240,6 +312,9 @@ export class QueryManager implements Startable { } queryFinished = true + const queryDuration = Date.now() - queryStartTime + this.metrics?.histogram('libp2p_kad_dht_query_total_time', queryDuration) + } catch (err) { if (this.running) { // ignore errors thrown during shut down