diff --git a/packages/clients/peerbit/src/peer.ts b/packages/clients/peerbit/src/peer.ts index 56b486482..2d4e3462d 100644 --- a/packages/clients/peerbit/src/peer.ts +++ b/packages/clients/peerbit/src/peer.ts @@ -68,6 +68,18 @@ export type CreateInstanceOptions = (SimpleLibp2pOptions | Libp2pOptions) & { indexer?: (directory?: string) => Promise | Indices; } & OptionalCreateOptions; +export type DialReadiness = + | "connection" + | "services" + | "services-and-fanout"; + +export type DialOptions = { + dialTimeoutMs?: number; + serviceWaitTimeoutMs?: number; + readiness?: DialReadiness; + signal?: AbortSignal; +}; + const isLibp2pInstance = (libp2p: Libp2pExtended | ClientCreateOptions) => !!(libp2p as Libp2p).getMultiaddrs; @@ -411,7 +423,7 @@ export class Peerbit implements ProgramClient { */ async dial( address: string | Multiaddr | Multiaddr[] | ProgramClient, - options?: { dialTimeoutMs?: number; signal?: AbortSignal }, + options?: DialOptions, ): Promise { const maddress = typeof address === "string" @@ -457,21 +469,42 @@ export class Peerbit implements ProgramClient { const publicKey = Ed25519PublicKey.fromPeerId(connection.remotePeer); const peerHash = publicKey.hashcode(); - // TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks - try { - await this.libp2p.services.pubsub.waitFor(peerHash, { - target: "neighbor", - }); - } catch (error) { - throw new Error(`Failed to dial peer. Not available on Pubsub`); + const resolvedReadiness: DialReadiness = + options?.readiness ?? + (this.libp2p.services.fanout ? "services-and-fanout" : "services"); + const serviceWaitTimeoutMs = + options?.serviceWaitTimeoutMs ?? options?.dialTimeoutMs; + if (resolvedReadiness === "connection") { + return true; } - try { - await this.libp2p.services.blocks.waitFor(peerHash, { - target: "neighbor", - }); - } catch (error) { - throw new Error(`Failed to dial peer. Not available on Blocks`); + const waitForNeighbor = async ( + label: "Pubsub" | "Blocks" | "Fanout", + service: { waitFor: (peer: string, options?: any) => Promise }, + ) => { + try { + await service.waitFor(peerHash, { + target: "neighbor", + timeout: serviceWaitTimeoutMs, + signal: dialSignal, + }); + } catch (_error) { + throw new Error(`Failed to dial peer. Not available on ${label}`); + } + }; + + // TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks + await waitForNeighbor("Pubsub", this.libp2p.services.pubsub); + await waitForNeighbor("Blocks", this.libp2p.services.blocks); + + if (resolvedReadiness === "services-and-fanout") { + const fanoutService = (this.libp2p.services as any).fanout; + if (!fanoutService?.waitFor) { + throw new Error( + "Failed to dial peer. Not available on Fanout (service missing)", + ); + } + await waitForNeighbor("Fanout", fanoutService); } return true; diff --git a/packages/clients/peerbit/test/connect.spec.ts b/packages/clients/peerbit/test/connect.spec.ts index 34dc2e048..db341f93a 100644 --- a/packages/clients/peerbit/test/connect.spec.ts +++ b/packages/clients/peerbit/test/connect.spec.ts @@ -83,6 +83,63 @@ describe(`dial`, function () { await waitForResolved(() => expect(received).to.exist); expect([...received!]).to.deep.equal([7, 8, 9]); }); + + it("waits for fanout by default when fanout service is present", async () => { + const originalWaitFor = clients[0].services.fanout.waitFor.bind( + clients[0].services.fanout, + ); + (clients[0].services.fanout as any).waitFor = async () => { + throw new Error("fanout-not-ready"); + }; + + try { + await expect( + clients[0].dial(clients[1].getMultiaddrs()[0], { + dialTimeoutMs: 5_000, + }), + ).to.be.rejectedWith("Fanout"); + } finally { + (clients[0].services.fanout as any).waitFor = originalWaitFor; + } + }); + + it("allows skipping fanout readiness checks", async () => { + const originalWaitFor = clients[0].services.fanout.waitFor.bind( + clients[0].services.fanout, + ); + (clients[0].services.fanout as any).waitFor = async () => { + throw new Error("fanout-not-ready"); + }; + + try { + const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], { + readiness: "services", + dialTimeoutMs: 5_000, + }); + expect(ok).to.be.true; + } finally { + (clients[0].services.fanout as any).waitFor = originalWaitFor; + } + }); + + it("supports connection-only dial readiness", async () => { + const originalPubsubWaitFor = clients[0].services.pubsub.waitFor.bind( + clients[0].services.pubsub, + ); + (clients[0].services.pubsub as any).waitFor = async () => { + throw new Error("pubsub-not-ready"); + }; + + try { + const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], { + readiness: "connection", + dialTimeoutMs: 5_000, + }); + expect(ok).to.be.true; + } finally { + (clients[0].services.pubsub as any).waitFor = originalPubsubWaitFor; + } + }); } it("dialer settings", async () => { expect(clients[0].services.pubsub.connectionManagerOptions.dialer).to.exist; diff --git a/packages/programs/program/program/src/client.ts b/packages/programs/program/program/src/client.ts index debf67c1e..19230ce91 100644 --- a/packages/programs/program/program/src/client.ts +++ b/packages/programs/program/program/src/client.ts @@ -22,7 +22,15 @@ export interface Client>> { peerId: Libp2pPeerId; identity: Identity; getMultiaddrs: () => Multiaddr[]; - dial(address: string | Multiaddr | Multiaddr[]): Promise; + dial( + address: string | Multiaddr | Multiaddr[], + options?: { + dialTimeoutMs?: number; + serviceWaitTimeoutMs?: number; + readiness?: "connection" | "services" | "services-and-fanout"; + signal?: AbortSignal; + }, + ): Promise; hangUp(address: PeerId | PublicSignKey | string | Multiaddr): Promise; services: { pubsub: PubSub;