|
| 1 | +import { randomBytes } from '@libp2p/crypto' |
| 2 | +import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' |
| 3 | +import { anySignal } from 'any-signal' |
| 4 | +import pDefer, { type DeferredPromise } from 'p-defer' |
| 5 | +import { raceEvent } from 'race-event' |
| 6 | +import { raceSignal } from 'race-signal' |
| 7 | +import type { AbortOptions, ComponentLogger, Logger, PeerInfo, PeerRouting, Startable } from '@libp2p/interface' |
| 8 | +import type { RandomWalk as RandomWalkInterface } from '@libp2p/interface-internal' |
| 9 | + |
| 10 | +export interface RandomWalkComponents { |
| 11 | + peerRouting: PeerRouting |
| 12 | + logger: ComponentLogger |
| 13 | +} |
| 14 | + |
| 15 | +interface RandomWalkEvents { |
| 16 | + 'walk:peer': CustomEvent<PeerInfo> |
| 17 | + 'walk:error': CustomEvent<Error> |
| 18 | +} |
| 19 | + |
| 20 | +export class RandomWalk extends TypedEventEmitter<RandomWalkEvents> implements RandomWalkInterface, Startable { |
| 21 | + private readonly peerRouting: PeerRouting |
| 22 | + private readonly log: Logger |
| 23 | + private walking: boolean |
| 24 | + private walkers: number |
| 25 | + private shutdownController: AbortController |
| 26 | + private walkController?: AbortController |
| 27 | + private needNext?: DeferredPromise<void> |
| 28 | + |
| 29 | + constructor (components: RandomWalkComponents) { |
| 30 | + super() |
| 31 | + |
| 32 | + this.log = components.logger.forComponent('libp2p:random-walk') |
| 33 | + this.peerRouting = components.peerRouting |
| 34 | + this.walkers = 0 |
| 35 | + this.walking = false |
| 36 | + |
| 37 | + // stops any in-progress walks when the node is shut down |
| 38 | + this.shutdownController = new AbortController() |
| 39 | + setMaxListeners(Infinity, this.shutdownController.signal) |
| 40 | + } |
| 41 | + |
| 42 | + start (): void { |
| 43 | + this.shutdownController = new AbortController() |
| 44 | + setMaxListeners(Infinity, this.shutdownController.signal) |
| 45 | + } |
| 46 | + |
| 47 | + stop (): void { |
| 48 | + this.shutdownController.abort() |
| 49 | + } |
| 50 | + |
| 51 | + async * walk (options?: AbortOptions): AsyncGenerator<PeerInfo> { |
| 52 | + if (!this.walking) { |
| 53 | + // start the query that causes walk:peer events to be emitted |
| 54 | + this.startWalk() |
| 55 | + } |
| 56 | + |
| 57 | + this.walkers++ |
| 58 | + const signal = anySignal([this.shutdownController.signal, options?.signal]) |
| 59 | + setMaxListeners(Infinity, signal) |
| 60 | + |
| 61 | + try { |
| 62 | + while (true) { |
| 63 | + // if another consumer has paused the query, start it again |
| 64 | + this.needNext?.resolve() |
| 65 | + this.needNext = pDefer() |
| 66 | + |
| 67 | + // wait for a walk:peer or walk:error event |
| 68 | + const event = await raceEvent<CustomEvent<PeerInfo>>(this, 'walk:peer', signal, { |
| 69 | + errorEvent: 'walk:error' |
| 70 | + }) |
| 71 | + |
| 72 | + yield event.detail |
| 73 | + } |
| 74 | + } finally { |
| 75 | + signal.clear() |
| 76 | + this.walkers-- |
| 77 | + |
| 78 | + // stop the walk if no more consumers are interested |
| 79 | + if (this.walkers === 0) { |
| 80 | + this.walkController?.abort() |
| 81 | + this.walkController = undefined |
| 82 | + } |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + private startWalk (): void { |
| 87 | + this.walking = true |
| 88 | + |
| 89 | + // the signal for this controller will be aborted if no more random peers |
| 90 | + // are required |
| 91 | + this.walkController = new AbortController() |
| 92 | + setMaxListeners(Infinity, this.walkController.signal) |
| 93 | + |
| 94 | + const signal = anySignal([this.walkController.signal, this.shutdownController.signal]) |
| 95 | + setMaxListeners(Infinity, signal) |
| 96 | + |
| 97 | + const start = Date.now() |
| 98 | + let found = 0 |
| 99 | + |
| 100 | + Promise.resolve().then(async () => { |
| 101 | + this.log('start walk') |
| 102 | + |
| 103 | + // find peers until no more consumers are interested |
| 104 | + while (this.walkers > 0) { |
| 105 | + try { |
| 106 | + for await (const peer of this.peerRouting.getClosestPeers(randomBytes(32), { signal })) { |
| 107 | + signal.throwIfAborted() |
| 108 | + |
| 109 | + this.log('found peer %p', peer.id) |
| 110 | + found++ |
| 111 | + this.safeDispatchEvent('walk:peer', { |
| 112 | + detail: peer |
| 113 | + }) |
| 114 | + |
| 115 | + // if we only have one consumer, pause the query until they request |
| 116 | + // another random peer or they signal they are no longer interested |
| 117 | + if (this.walkers === 1 && this.needNext != null) { |
| 118 | + await raceSignal(this.needNext.promise, signal) |
| 119 | + } |
| 120 | + } |
| 121 | + } catch (err) { |
| 122 | + this.log.error('randomwalk errored', err) |
| 123 | + |
| 124 | + this.safeDispatchEvent('walk:error', { |
| 125 | + detail: err |
| 126 | + }) |
| 127 | + } |
| 128 | + } |
| 129 | + }) |
| 130 | + .catch(err => { |
| 131 | + this.log.error('randomwalk errored', err) |
| 132 | + }) |
| 133 | + .finally(() => { |
| 134 | + this.log('finished walk, found %d peers after %dms', found, Date.now() - start) |
| 135 | + this.walking = false |
| 136 | + }) |
| 137 | + } |
| 138 | +} |
0 commit comments