Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions packages/clients/peerbit/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ export type CreateInstanceOptions = (SimpleLibp2pOptions | Libp2pOptions) & {
indexer?: (directory?: string) => Promise<Indices> | Indices;
} & OptionalCreateOptions;

export type DialReadiness =
| "connection"
| "services"
| "services-and-fanout";

export type DialOptions = {
dialTimeoutMs?: number;
serviceWaitTimeoutMs?: number;
readiness?: DialReadiness;
signal?: AbortSignal;
};

const isLibp2pInstance = (libp2p: Libp2pExtended | ClientCreateOptions) =>
!!(libp2p as Libp2p).getMultiaddrs;

Expand Down Expand Up @@ -411,7 +423,7 @@ export class Peerbit implements ProgramClient {
*/
async dial(
address: string | Multiaddr | Multiaddr[] | ProgramClient,
options?: { dialTimeoutMs?: number; signal?: AbortSignal },
options?: DialOptions,
): Promise<boolean> {
const maddress =
typeof address === "string"
Expand Down Expand Up @@ -457,21 +469,42 @@ export class Peerbit implements ProgramClient {
const publicKey = Ed25519PublicKey.fromPeerId(connection.remotePeer);
const peerHash = publicKey.hashcode();

// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
try {
await this.libp2p.services.pubsub.waitFor(peerHash, {
target: "neighbor",
});
} catch (error) {
throw new Error(`Failed to dial peer. Not available on Pubsub`);
const resolvedReadiness: DialReadiness =
options?.readiness ??
(this.libp2p.services.fanout ? "services-and-fanout" : "services");
const serviceWaitTimeoutMs =
options?.serviceWaitTimeoutMs ?? options?.dialTimeoutMs;
if (resolvedReadiness === "connection") {
return true;
}

try {
await this.libp2p.services.blocks.waitFor(peerHash, {
target: "neighbor",
});
} catch (error) {
throw new Error(`Failed to dial peer. Not available on Blocks`);
const waitForNeighbor = async (
label: "Pubsub" | "Blocks" | "Fanout",
service: { waitFor: (peer: string, options?: any) => Promise<any> },
) => {
try {
await service.waitFor(peerHash, {
target: "neighbor",
timeout: serviceWaitTimeoutMs,
signal: dialSignal,
});
} catch (_error) {
throw new Error(`Failed to dial peer. Not available on ${label}`);
}
};

// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
await waitForNeighbor("Pubsub", this.libp2p.services.pubsub);
await waitForNeighbor("Blocks", this.libp2p.services.blocks);

if (resolvedReadiness === "services-and-fanout") {
const fanoutService = (this.libp2p.services as any).fanout;
if (!fanoutService?.waitFor) {
throw new Error(
"Failed to dial peer. Not available on Fanout (service missing)",
);
}
await waitForNeighbor("Fanout", fanoutService);
}

return true;
Expand Down
57 changes: 57 additions & 0 deletions packages/clients/peerbit/test/connect.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,63 @@ describe(`dial`, function () {
await waitForResolved(() => expect(received).to.exist);
expect([...received!]).to.deep.equal([7, 8, 9]);
});

it("waits for fanout by default when fanout service is present", async () => {
const originalWaitFor = clients[0].services.fanout.waitFor.bind(
clients[0].services.fanout,
);
(clients[0].services.fanout as any).waitFor = async () => {
throw new Error("fanout-not-ready");
};

try {
await expect(
clients[0].dial(clients[1].getMultiaddrs()[0], {
dialTimeoutMs: 5_000,
}),
).to.be.rejectedWith("Fanout");
} finally {
(clients[0].services.fanout as any).waitFor = originalWaitFor;
}
});

it("allows skipping fanout readiness checks", async () => {
const originalWaitFor = clients[0].services.fanout.waitFor.bind(
clients[0].services.fanout,
);
(clients[0].services.fanout as any).waitFor = async () => {
throw new Error("fanout-not-ready");
};

try {
const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], {
readiness: "services",
dialTimeoutMs: 5_000,
});
expect(ok).to.be.true;
} finally {
(clients[0].services.fanout as any).waitFor = originalWaitFor;
}
});

it("supports connection-only dial readiness", async () => {
const originalPubsubWaitFor = clients[0].services.pubsub.waitFor.bind(
clients[0].services.pubsub,
);
(clients[0].services.pubsub as any).waitFor = async () => {
throw new Error("pubsub-not-ready");
};

try {
const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], {
readiness: "connection",
dialTimeoutMs: 5_000,
});
expect(ok).to.be.true;
} finally {
(clients[0].services.pubsub as any).waitFor = originalPubsubWaitFor;
}
});
}
it("dialer settings", async () => {
expect(clients[0].services.pubsub.connectionManagerOptions.dialer).to.exist;
Expand Down
10 changes: 9 additions & 1 deletion packages/programs/program/program/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ export interface Client<T extends Manageable<ExtractArgs<T>>> {
peerId: Libp2pPeerId;
identity: Identity<Ed25519PublicKey>;
getMultiaddrs: () => Multiaddr[];
dial(address: string | Multiaddr | Multiaddr[]): Promise<boolean>;
dial(
address: string | Multiaddr | Multiaddr[],
options?: {
dialTimeoutMs?: number;
serviceWaitTimeoutMs?: number;
readiness?: "connection" | "services" | "services-and-fanout";
signal?: AbortSignal;
},
): Promise<boolean>;
hangUp(address: PeerId | PublicSignKey | string | Multiaddr): Promise<void>;
services: {
pubsub: PubSub;
Expand Down
Loading