Skip to content

Commit c485a73

Browse files
feat(peerbit): define dial readiness semantics for fanout (#613)
1 parent 4783d05 commit c485a73

File tree

3 files changed

+113
-15
lines changed

3 files changed

+113
-15
lines changed

packages/clients/peerbit/src/peer.ts

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ export type CreateInstanceOptions = (SimpleLibp2pOptions | Libp2pOptions) & {
6868
indexer?: (directory?: string) => Promise<Indices> | Indices;
6969
} & OptionalCreateOptions;
7070

71+
export type DialReadiness =
72+
| "connection"
73+
| "services"
74+
| "services-and-fanout";
75+
76+
export type DialOptions = {
77+
dialTimeoutMs?: number;
78+
serviceWaitTimeoutMs?: number;
79+
readiness?: DialReadiness;
80+
signal?: AbortSignal;
81+
};
82+
7183
const isLibp2pInstance = (libp2p: Libp2pExtended | ClientCreateOptions) =>
7284
!!(libp2p as Libp2p).getMultiaddrs;
7385

@@ -411,7 +423,7 @@ export class Peerbit implements ProgramClient {
411423
*/
412424
async dial(
413425
address: string | Multiaddr | Multiaddr[] | ProgramClient,
414-
options?: { dialTimeoutMs?: number; signal?: AbortSignal },
426+
options?: DialOptions,
415427
): Promise<boolean> {
416428
const maddress =
417429
typeof address === "string"
@@ -457,21 +469,42 @@ export class Peerbit implements ProgramClient {
457469
const publicKey = Ed25519PublicKey.fromPeerId(connection.remotePeer);
458470
const peerHash = publicKey.hashcode();
459471

460-
// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
461-
try {
462-
await this.libp2p.services.pubsub.waitFor(peerHash, {
463-
target: "neighbor",
464-
});
465-
} catch (error) {
466-
throw new Error(`Failed to dial peer. Not available on Pubsub`);
472+
const resolvedReadiness: DialReadiness =
473+
options?.readiness ??
474+
(this.libp2p.services.fanout ? "services-and-fanout" : "services");
475+
const serviceWaitTimeoutMs =
476+
options?.serviceWaitTimeoutMs ?? options?.dialTimeoutMs;
477+
if (resolvedReadiness === "connection") {
478+
return true;
467479
}
468480

469-
try {
470-
await this.libp2p.services.blocks.waitFor(peerHash, {
471-
target: "neighbor",
472-
});
473-
} catch (error) {
474-
throw new Error(`Failed to dial peer. Not available on Blocks`);
481+
const waitForNeighbor = async (
482+
label: "Pubsub" | "Blocks" | "Fanout",
483+
service: { waitFor: (peer: string, options?: any) => Promise<any> },
484+
) => {
485+
try {
486+
await service.waitFor(peerHash, {
487+
target: "neighbor",
488+
timeout: serviceWaitTimeoutMs,
489+
signal: dialSignal,
490+
});
491+
} catch (_error) {
492+
throw new Error(`Failed to dial peer. Not available on ${label}`);
493+
}
494+
};
495+
496+
// TODO, do this as a promise instead using the onPeerConnected vents in pubsub and blocks
497+
await waitForNeighbor("Pubsub", this.libp2p.services.pubsub);
498+
await waitForNeighbor("Blocks", this.libp2p.services.blocks);
499+
500+
if (resolvedReadiness === "services-and-fanout") {
501+
const fanoutService = (this.libp2p.services as any).fanout;
502+
if (!fanoutService?.waitFor) {
503+
throw new Error(
504+
"Failed to dial peer. Not available on Fanout (service missing)",
505+
);
506+
}
507+
await waitForNeighbor("Fanout", fanoutService);
475508
}
476509

477510
return true;

packages/clients/peerbit/test/connect.spec.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,63 @@ describe(`dial`, function () {
8383
await waitForResolved(() => expect(received).to.exist);
8484
expect([...received!]).to.deep.equal([7, 8, 9]);
8585
});
86+
87+
it("waits for fanout by default when fanout service is present", async () => {
88+
const originalWaitFor = clients[0].services.fanout.waitFor.bind(
89+
clients[0].services.fanout,
90+
);
91+
(clients[0].services.fanout as any).waitFor = async () => {
92+
throw new Error("fanout-not-ready");
93+
};
94+
95+
try {
96+
await expect(
97+
clients[0].dial(clients[1].getMultiaddrs()[0], {
98+
dialTimeoutMs: 5_000,
99+
}),
100+
).to.be.rejectedWith("Fanout");
101+
} finally {
102+
(clients[0].services.fanout as any).waitFor = originalWaitFor;
103+
}
104+
});
105+
106+
it("allows skipping fanout readiness checks", async () => {
107+
const originalWaitFor = clients[0].services.fanout.waitFor.bind(
108+
clients[0].services.fanout,
109+
);
110+
(clients[0].services.fanout as any).waitFor = async () => {
111+
throw new Error("fanout-not-ready");
112+
};
113+
114+
try {
115+
const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], {
116+
readiness: "services",
117+
dialTimeoutMs: 5_000,
118+
});
119+
expect(ok).to.be.true;
120+
} finally {
121+
(clients[0].services.fanout as any).waitFor = originalWaitFor;
122+
}
123+
});
124+
125+
it("supports connection-only dial readiness", async () => {
126+
const originalPubsubWaitFor = clients[0].services.pubsub.waitFor.bind(
127+
clients[0].services.pubsub,
128+
);
129+
(clients[0].services.pubsub as any).waitFor = async () => {
130+
throw new Error("pubsub-not-ready");
131+
};
132+
133+
try {
134+
const ok = await clients[0].dial(clients[1].getMultiaddrs()[0], {
135+
readiness: "connection",
136+
dialTimeoutMs: 5_000,
137+
});
138+
expect(ok).to.be.true;
139+
} finally {
140+
(clients[0].services.pubsub as any).waitFor = originalPubsubWaitFor;
141+
}
142+
});
86143
}
87144
it("dialer settings", async () => {
88145
expect(clients[0].services.pubsub.connectionManagerOptions.dialer).to.exist;

packages/programs/program/program/src/client.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,15 @@ export interface Client<T extends Manageable<ExtractArgs<T>>> {
2222
peerId: Libp2pPeerId;
2323
identity: Identity<Ed25519PublicKey>;
2424
getMultiaddrs: () => Multiaddr[];
25-
dial(address: string | Multiaddr | Multiaddr[]): Promise<boolean>;
25+
dial(
26+
address: string | Multiaddr | Multiaddr[],
27+
options?: {
28+
dialTimeoutMs?: number;
29+
serviceWaitTimeoutMs?: number;
30+
readiness?: "connection" | "services" | "services-and-fanout";
31+
signal?: AbortSignal;
32+
},
33+
): Promise<boolean>;
2634
hangUp(address: PeerId | PublicSignKey | string | Multiaddr): Promise<void>;
2735
services: {
2836
pubsub: PubSub;

0 commit comments

Comments
 (0)