Skip to content

Commit 14881a0

Browse files
fix(pubsub): treat pending subscriptions as local interest (#602)
* fix(pubsub): treat pending subscriptions as local interest * Stabilize shared-log replication test and remove dead blog link * Relax shared-log timeout assertion for CI jitter
1 parent 000e3f1 commit 14881a0

File tree

5 files changed

+119
-8
lines changed

5 files changed

+119
-8
lines changed

docs/blog/2026-02-02-interactive-fanout-visualizer.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ We did not invent broadcast trees. We are adapting well-known ideas to a Peerbit
123123

124124
Some helpful mental comparisons:
125125
- **libp2p Gossipsub**: a strong baseline for decentralized pubsub, but at huge audience sizes you can still run into control-plane overhead patterns that look like subscription gossip amplification. Fanout Trees aim for a delivery pattern that is closer to "one transmission per subscriber" than "one transmission per underlay edge in a mesh" ([Gossipsub spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/README.md)).
126-
- **Netflix / Twitch**: their distribution "tree" is mostly an operator-controlled hierarchy (origin -> regional -> edge) plus caching, and the last hop is client unicast (the audience does not relay). Netflix's [Open Connect](https://openconnect.netflix.com/en/) is a good concrete example of this pattern: a CDN delivered by peering + appliances, with centralized control-plane decisions and an operator paying the bandwidth bill. Twitch's live video system has similar centralized characteristics (ingest, processing, and distribution are run by the platform rather than by viewers relaying to each other), described in [Ingesting Video at Worldwide Scale](https://blog.twitch.tv/en/2024/09/13/ingesting-video-at-worldwide-scale/). Fanout Trees are chasing a similar bounded-fanout delivery shape, but with relays selected by a join protocol and capacity limits enforced by peers rather than by a single provider.
126+
- **Netflix / Twitch**: their distribution "tree" is mostly an operator-controlled hierarchy (origin -> regional -> edge) plus caching, and the last hop is client unicast (the audience does not relay). Netflix's [Open Connect](https://openconnect.netflix.com/en/) is a good concrete example of this pattern: a CDN delivered by peering + appliances, with centralized control-plane decisions and an operator paying the bandwidth bill. Twitch's live video system has similar centralized characteristics (ingest, processing, and distribution are run by the platform rather than by viewers relaying to each other). Fanout Trees are chasing a similar bounded-fanout delivery shape, but with relays selected by a join protocol and capacity limits enforced by peers rather than by a single provider.
127127
- **Tor**: not a broadcast system, but it is a mature overlay where relays expose capacity and policy and clients build routes through multiple relays. The overlap here is operational: dialing/keeping connections, handling churn, and making capacity-aware routing decisions without melting the network. One key connection is discovery: Tor uses directory authorities to publish and agree on network status, so clients can find relays and weight paths by capacity ([Tor Directory Protocol v3](https://spec.torproject.org/dir-spec/)). Fanout Trees are about high-throughput fanout delivery, so the threat model and routing goals are different, but the "directory/tracker + capacity-aware route selection" idea is very much shared.
128128
- **Iroh**: Iroh's [`iroh-gossip`](https://docs.rs/iroh-gossip/latest/iroh_gossip/) explicitly builds on *Epidemic Broadcast Trees* (HyParView membership + PlumTree broadcast), so it is in the same family of "tree push + bounded repair over a partial view." The main differences are scope and control: Iroh's gossip is symmetric (any peer can broadcast within a topic swarm) and does not assume a designated root, while Peerbit Fanout Trees are channel-rooted and root-sequenced (publishes can be proxied upstream to a root that assigns sequence numbers). Fanout Trees also add tracker-backed, capacity-aware admission (max children, upload budgets, optional bidding) plus route tokens for economical unicast/proxy-publish inside the same overlay. The tradeoff is explicit: a root is a coordination point, so the long-term answer is multi-root sharding and/or root rotation when you want many writers at once.
129129

packages/programs/data/shared-log/test/events.spec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ describe("events", () => {
352352
}),
353353
).to.be.eventually.rejectedWith("Timeout");
354354
let t1 = Date.now();
355-
expect(t1 - t0).to.be.greaterThanOrEqual(timeout);
355+
// Allow small timer jitter on busy CI runners.
356+
expect(t1 - t0).to.be.greaterThanOrEqual(timeout - 25);
356357
});
357358

358359
it("will wait for role age", async () => {

packages/programs/data/shared-log/test/replication.spec.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,11 @@ testSetups.forEach((setup) => {
204204
const fromHash = session.peers[0].identity.publicKey.hashcode();
205205

206206
// Ensure we start from a clean state on db2 for db1's ranges.
207-
await db1.log.rpc.send(
208-
new AllReplicatingSegmentsMessage({ segments: [] }),
209-
);
207+
// Clearing through the network can race with periodic replication
208+
// announcements and make this test flaky under CI load.
209+
await db2.log.replicationIndex.del({
210+
query: { hash: fromHash },
211+
});
210212
await waitForResolved(
211213
async () =>
212214
expect(

packages/transport/pubsub/src/index.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ export class TopicControlPlane
228228
public peerToTopic: Map<string, Set<string>>;
229229
// Local topic -> reference count.
230230
public subscriptions: Map<string, { counter: number }>;
231+
// Local topics requested via debounced subscribe, not yet applied in `subscriptions`.
232+
private pendingSubscriptions: Set<string>;
231233
public lastSubscriptionMessages: Map<string, Map<string, bigint>> = new Map();
232234
public dispatchEventOnSelfPublish: boolean;
233235
public readonly topicRootControlPlane: TopicRootControlPlane;
@@ -289,6 +291,7 @@ export class TopicControlPlane
289291
) {
290292
super(components, ["/peerbit/topic-control-plane/2.0.0"], props);
291293
this.subscriptions = new Map();
294+
this.pendingSubscriptions = new Set();
292295
this.topics = new Map();
293296
this.peerToTopic = new Map();
294297

@@ -448,6 +451,7 @@ export class TopicControlPlane
448451
this.autoCandidatesGossipUntil = 0;
449452

450453
this.subscriptions.clear();
454+
this.pendingSubscriptions.clear();
451455
this.topics.clear();
452456
this.peerToTopic.clear();
453457
this.lastSubscriptionMessages.clear();
@@ -760,11 +764,24 @@ export class TopicControlPlane
760764
const subscriptions: string[] = [];
761765
if (topics) {
762766
for (const topic of topics) {
763-
if (this.subscriptions.get(topic)) subscriptions.push(topic);
767+
if (
768+
this.subscriptions.get(topic) ||
769+
this.pendingSubscriptions.has(topic)
770+
) {
771+
subscriptions.push(topic);
772+
}
764773
}
765774
return subscriptions;
766775
}
767-
for (const [topic] of this.subscriptions) subscriptions.push(topic);
776+
const seen = new Set<string>();
777+
for (const [topic] of this.subscriptions) {
778+
subscriptions.push(topic);
779+
seen.add(topic);
780+
}
781+
for (const topic of this.pendingSubscriptions) {
782+
if (seen.has(topic)) continue;
783+
subscriptions.push(topic);
784+
}
768785
return subscriptions;
769786
}
770787

@@ -1098,6 +1115,10 @@ export class TopicControlPlane
10981115
}
10991116

11001117
async subscribe(topic: string) {
1118+
this.pendingSubscriptions.add(topic);
1119+
// `subscribe()` is debounced; start tracking immediately to avoid dropping
1120+
// inbound subscription traffic during the debounce window.
1121+
this.initializeTopic(topic);
11011122
return this.debounceSubscribeAggregator.add({ key: topic });
11021123
}
11031124

@@ -1111,10 +1132,12 @@ export class TopicControlPlane
11111132
let prev = this.subscriptions.get(topic);
11121133
if (prev) {
11131134
prev.counter += counter;
1135+
this.pendingSubscriptions.delete(topic);
11141136
continue;
11151137
}
11161138
this.subscriptions.set(topic, { counter });
11171139
this.initializeTopic(topic);
1140+
this.pendingSubscriptions.delete(topic);
11181141

11191142
const shardTopic = this.getShardTopicForUserTopic(topic);
11201143
byShard.set(shardTopic, [...(byShard.get(shardTopic) ?? []), topic]);
@@ -1156,8 +1179,13 @@ export class TopicControlPlane
11561179
data?: Uint8Array;
11571180
},
11581181
) {
1182+
this.pendingSubscriptions.delete(topic);
1183+
11591184
if (this.debounceSubscribeAggregator.has(topic)) {
11601185
this.debounceSubscribeAggregator.delete(topic);
1186+
if (!this.subscriptions.has(topic)) {
1187+
this.untrackTopic(topic);
1188+
}
11611189
return false;
11621190
}
11631191

@@ -1736,7 +1764,7 @@ export class TopicControlPlane
17361764

17371765
if (pubsubMessage instanceof PubSubData) {
17381766
const wantsTopic = pubsubMessage.topics.some((t) =>
1739-
this.subscriptions.has(t),
1767+
this.subscriptions.has(t) || this.pendingSubscriptions.has(t),
17401768
);
17411769
isForMe = pubsubMessage.strict ? isForMe && wantsTopic : wantsTopic;
17421770
}

packages/transport/pubsub/test/fanout-topics.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { getPublicKeyFromPeerId, randomBytes } from "@peerbit/crypto";
22
import { TestSession } from "@peerbit/libp2p-test-utils";
33
import {
4+
PubSubData,
45
PubSubMessage,
56
Subscribe,
67
type DataEvent as PubSubDataEvent,
78
} from "@peerbit/pubsub-interface";
9+
import { SilentDelivery } from "@peerbit/stream-interface";
810
import { waitForResolved } from "@peerbit/time";
911
import { expect } from "chai";
1012
import { FanoutTree, TopicControlPlane, TopicRootControlPlane } from "../src/index.js";
@@ -331,4 +333,82 @@ describe("pubsub (fanout topics)", function () {
331333
await session.stop();
332334
}
333335
});
336+
337+
it("tracks pending subscribe immediately and cleans up if cancelled before debounce", async () => {
338+
const { session } = await createSession(1, {
339+
pubsub: {
340+
subscriptionDebounceDelay: 500,
341+
},
342+
});
343+
344+
try {
345+
const topic = "pending-subscribe-topic";
346+
const pubsub = session.peers[0]!.services.pubsub as any;
347+
348+
const subscribePromise = pubsub.subscribe(topic);
349+
expect(pubsub.topics.has(topic)).to.equal(true);
350+
expect(pubsub.subscriptions.has(topic)).to.equal(false);
351+
expect(pubsub.pendingSubscriptions.has(topic)).to.equal(true);
352+
expect(pubsub.getSubscriptionOverlap([topic])).to.deep.equal([topic]);
353+
354+
const removed = await pubsub.unsubscribe(topic);
355+
expect(removed).to.equal(false);
356+
expect(pubsub.pendingSubscriptions.has(topic)).to.equal(false);
357+
expect(pubsub.subscriptions.has(topic)).to.equal(false);
358+
expect(pubsub.topics.has(topic)).to.equal(false);
359+
360+
await subscribePromise;
361+
} finally {
362+
await session.stop();
363+
}
364+
});
365+
366+
it("accepts strict direct delivery while subscribe is pending", async () => {
367+
const { session } = await createSession(2, {
368+
pubsub: {
369+
subscriptionDebounceDelay: 500,
370+
},
371+
});
372+
373+
try {
374+
const topic = "pending-strict-delivery-topic";
375+
const sender = session.peers[0]!.services.pubsub;
376+
const receiver = session.peers[1]!.services.pubsub;
377+
const received: Uint8Array[] = [];
378+
379+
receiver.addEventListener("data", (ev: any) => {
380+
const detail = ev.detail as PubSubDataEvent;
381+
if (!detail?.data?.topics?.includes?.(topic)) return;
382+
received.push(detail.data.data);
383+
});
384+
385+
const pendingSubscribe = receiver.subscribe(topic);
386+
const payload = new Uint8Array([7, 9, 11, 13]);
387+
const strictMessage = await (sender as any).createMessage(
388+
new PubSubData({ topics: [topic], data: payload, strict: true }).bytes(),
389+
{
390+
mode: new SilentDelivery({
391+
to: [receiver.publicKeyHash],
392+
redundancy: 1,
393+
}),
394+
skipRecipientValidation: true,
395+
},
396+
);
397+
await receiver.onDataMessage(
398+
sender.publicKey,
399+
{} as any,
400+
strictMessage,
401+
0,
402+
);
403+
404+
await waitForResolved(() => {
405+
expect(received).to.have.length(1);
406+
expect([...received[0]!]).to.deep.equal([...payload]);
407+
});
408+
409+
await pendingSubscribe;
410+
} finally {
411+
await session.stop();
412+
}
413+
});
334414
});

0 commit comments

Comments
 (0)