Skip to content

Commit be6ac5e

Browse files
feat(transport): unify directstream and fanout route hints
1 parent 14881a0 commit be6ac5e

File tree

8 files changed

+189
-1
lines changed

8 files changed

+189
-1
lines changed

packages/transport/pubsub-interface/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type PeerEvents,
1010
type PriorityOptions,
1111
type PublicKeyFromHashResolver,
12+
type RouteHint,
1213
type WaitForPeer,
1314
type WithExtraSigners,
1415
} from "@peerbit/stream-interface";
@@ -136,6 +137,10 @@ export interface PubSub
136137
WaitForPeer,
137138
PublicKeyFromHashResolver {
138139
getSubscribers(topic: string): MaybePromise<PublicSignKey[] | undefined>;
140+
getUnifiedRouteHints?(
141+
topic: string,
142+
targetHash: string,
143+
): MaybePromise<RouteHint[]>;
139144

140145
requestSubscribers(topic: string, from?: PublicSignKey): MaybePromise<void>;
141146

packages/transport/pubsub/src/fanout-tree.ts

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import {
88
type PeerStreams,
99
dontThrowIfDeliveryError,
1010
} from "@peerbit/stream";
11-
import { AnyWhere, DataMessage, type StreamEvents } from "@peerbit/stream-interface";
11+
import {
12+
AnyWhere,
13+
DataMessage,
14+
type FanoutRouteTokenHint,
15+
type StreamEvents,
16+
} from "@peerbit/stream-interface";
1217
import { AbortError, TimeoutError, delay } from "@peerbit/time";
1318
import { anySignal } from "any-signal";
1419
import { Uint8ArrayList } from "uint8arraylist";
@@ -2400,6 +2405,50 @@ export class FanoutTree extends DirectStream<FanoutTreeEvents> {
24002405
return [...ch.routeFromRoot];
24012406
}
24022407

2408+
public getRouteHint(
2409+
topic: string,
2410+
root: string,
2411+
targetHash: string,
2412+
): FanoutRouteTokenHint | undefined {
2413+
const id = this.getChannelId(topic, root);
2414+
const ch = this.channelsBySuffixKey.get(id.suffixKey);
2415+
if (!ch) return undefined;
2416+
2417+
if (targetHash === this.publicKeyHash && ch.routeFromRoot?.length) {
2418+
return {
2419+
kind: "fanout-token",
2420+
root,
2421+
target: targetHash,
2422+
route: [...ch.routeFromRoot],
2423+
updatedAt: Date.now(),
2424+
};
2425+
}
2426+
if (ch.isRoot && ch.children.has(targetHash)) {
2427+
return {
2428+
kind: "fanout-token",
2429+
root,
2430+
target: targetHash,
2431+
route: [root, targetHash],
2432+
updatedAt: Date.now(),
2433+
};
2434+
}
2435+
2436+
const route = this.getCachedRoute(ch, targetHash);
2437+
if (!route) return undefined;
2438+
const entry = ch.routeByPeer.get(targetHash);
2439+
const updatedAt = entry?.updatedAt ?? Date.now();
2440+
2441+
return {
2442+
kind: "fanout-token",
2443+
root,
2444+
target: targetHash,
2445+
route,
2446+
updatedAt,
2447+
expiresAt:
2448+
ch.routeCacheTtlMs > 0 ? updatedAt + ch.routeCacheTtlMs : undefined,
2449+
};
2450+
}
2451+
24032452
private cacheRoute(ch: ChannelState, route: string[]) {
24042453
if (!route?.length) return;
24052454
if (route[0] !== ch.id.root) return;

packages/transport/pubsub/src/index.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
MessageHeader,
3737
NotStartedError,
3838
type PriorityOptions,
39+
type RouteHint,
3940
SilentDelivery,
4041
type WithExtraSigners,
4142
deliveryModeHasReceiver,
@@ -1280,6 +1281,39 @@ export class TopicControlPlane
12801281
return ret;
12811282
}
12821283

1284+
/**
1285+
* Returns best-effort route hints for a target peer by combining:
1286+
* - DirectStream ACK-learned routes
1287+
* - Fanout route tokens for the topic's shard overlay
1288+
*/
1289+
getUnifiedRouteHints(topic: string, targetHash: string): RouteHint[] {
1290+
const hints: RouteHint[] = [];
1291+
const directHint = this.getBestRouteHint(targetHash);
1292+
if (directHint) {
1293+
hints.push(directHint);
1294+
}
1295+
1296+
const topicString = topic.toString();
1297+
const shardTopic = topicString.startsWith(this.shardTopicPrefix)
1298+
? topicString
1299+
: this.getShardTopicForUserTopic(topicString);
1300+
const shard = this.fanoutChannels.get(shardTopic);
1301+
if (!shard) {
1302+
return hints;
1303+
}
1304+
1305+
const fanoutHint = this.fanout.getRouteHint(
1306+
shardTopic,
1307+
shard.root,
1308+
targetHash,
1309+
);
1310+
if (fanoutHint) {
1311+
hints.push(fanoutHint);
1312+
}
1313+
1314+
return hints;
1315+
}
1316+
12831317
async requestSubscribers(
12841318
topic: string | string[],
12851319
to?: PublicSignKey,

packages/transport/pubsub/test/fanout-topics.spec.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,31 @@ describe("pubsub (fanout topics)", function () {
177177
}
178178
});
179179

180+
it("exposes unified route hints from directstream and fanout", async () => {
181+
const { session, configureBootstraps, configureShards } = await createSession(2);
182+
183+
try {
184+
const TOPIC = "fanout-route-hints-topic";
185+
await configureShards([0]);
186+
configureBootstraps([0]);
187+
188+
await session.peers[0]!.services.pubsub.subscribe(TOPIC);
189+
await session.peers[1]!.services.pubsub.subscribe(TOPIC);
190+
191+
const targetHash = session.peers[1]!.services.pubsub.publicKeyHash;
192+
await waitForResolved(() => {
193+
const hints = session.peers[0]!.services.pubsub.getUnifiedRouteHints!(
194+
TOPIC,
195+
targetHash,
196+
);
197+
expect(hints.some((h) => h.kind === "directstream-ack")).to.equal(true);
198+
expect(hints.some((h) => h.kind === "fanout-token")).to.equal(true);
199+
});
200+
} finally {
201+
await session.stop();
202+
}
203+
});
204+
180205
it("preserves publish id/priority/signatures for fanout-backed topics", async () => {
181206
const { session, configureBootstraps, configureShards } = await createSession(3);
182207

packages/transport/stream-interface/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface StreamEvents extends PeerEvents, MessageEvents {
1818
}
1919

2020
export * from "./messages.js";
21+
export * from "./route-hints.js";
2122

2223
// ---------- wait for peer types ----------
2324
export type Target = "neighbor" | "reachable";
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
export type DirectStreamAckRouteHint = {
2+
kind: "directstream-ack";
3+
from: string;
4+
target: string;
5+
nextHop: string;
6+
distance: number;
7+
session: number;
8+
updatedAt: number;
9+
expiresAt?: number;
10+
};
11+
12+
export type FanoutRouteTokenHint = {
13+
kind: "fanout-token";
14+
root: string;
15+
target: string;
16+
route: string[];
17+
updatedAt: number;
18+
expiresAt?: number;
19+
};
20+
21+
export type RouteHint = DirectStreamAckRouteHint | FanoutRouteTokenHint;

packages/transport/stream/src/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import {
4747
getMsgId,
4848
} from "@peerbit/stream-interface";
4949
import type {
50+
DirectStreamAckRouteHint,
5051
IdOptions,
5152
PeerRefs,
5253
PriorityOptions,
@@ -1748,6 +1749,20 @@ export abstract class DirectStream<
17481749
this.routes.updateSession(key, undefined);
17491750
}
17501751

1752+
public getRouteHints(
1753+
target: string,
1754+
from: string = this.publicKeyHash,
1755+
): DirectStreamAckRouteHint[] {
1756+
return this.routes.getRouteHints(from, target);
1757+
}
1758+
1759+
public getBestRouteHint(
1760+
target: string,
1761+
from: string = this.publicKeyHash,
1762+
): DirectStreamAckRouteHint | undefined {
1763+
return this.routes.getBestRouteHint(from, target);
1764+
}
1765+
17511766
public onPeerSession(key: PublicSignKey, session: number) {
17521767
this.dispatchEvent(
17531768
// TODO types

packages/transport/stream/src/routes.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import type { DirectStreamAckRouteHint } from "@peerbit/stream-interface";
2+
13
export const MAX_ROUTE_DISTANCE = Number.MAX_SAFE_INTEGER - 1;
24

35
const DEFAULT_MAX_FROM_ENTRIES = 2048;
@@ -7,6 +9,7 @@ const DEFAULT_MAX_RELAYS_PER_TARGET = 32;
79
type RelayInfo = {
810
session: number;
911
hash: string;
12+
updatedAt: number;
1013
expireAt?: number;
1114
distance: number;
1215
};
@@ -257,6 +260,7 @@ export class Routes {
257260
if (route.distance > distance) {
258261
route.distance = distance;
259262
route.session = session;
263+
route.updatedAt = +new Date();
260264
route.expireAt = undefined; // remove expiry since we updated
261265
sortRoutes(prev.list);
262266
if (prev.list.length > this.maxRelaysPerTarget) {
@@ -267,6 +271,7 @@ export class Routes {
267271
return isNewRemoteSession ? "restart" : "updated";
268272
} else if (route.distance === distance) {
269273
route.session = session;
274+
route.updatedAt = +new Date();
270275
route.expireAt = undefined; // remove expiry since we updated
271276
if (prev.list.length > this.maxRelaysPerTarget) {
272277
prev.list.length = this.maxRelaysPerTarget;
@@ -290,6 +295,7 @@ export class Routes {
290295
distance,
291296
session,
292297
hash: neighbour,
298+
updatedAt: +new Date(),
293299
expireAt: isOldSession
294300
? +new Date() + this.routeMaxRetentionPeriod
295301
: undefined,
@@ -364,6 +370,38 @@ export class Routes {
364370
return this.routes.get(from)?.get(target);
365371
}
366372

373+
getRouteHints(from: string, target: string): DirectStreamAckRouteHint[] {
374+
const route = this.routes.get(from)?.get(target);
375+
if (!route) {
376+
return [];
377+
}
378+
const now = Date.now();
379+
const out: DirectStreamAckRouteHint[] = [];
380+
for (const next of route.list) {
381+
if (next.expireAt != null && next.expireAt < now) {
382+
continue;
383+
}
384+
out.push({
385+
kind: "directstream-ack",
386+
from,
387+
target,
388+
nextHop: next.hash,
389+
distance: next.distance,
390+
session: next.session,
391+
updatedAt: next.updatedAt,
392+
expiresAt: next.expireAt,
393+
});
394+
}
395+
return out;
396+
}
397+
398+
getBestRouteHint(
399+
from: string,
400+
target: string,
401+
): DirectStreamAckRouteHint | undefined {
402+
return this.getRouteHints(from, target)[0];
403+
}
404+
367405
isReachable(from: string, target: string, maxDistance = MAX_ROUTE_DISTANCE) {
368406
const remoteInfo = this.remoteInfo.get(target);
369407
const routeInfo = this.routes.get(from)?.get(target);

0 commit comments

Comments
 (0)