Skip to content

Commit 1929680

Browse files
Merge pull request #609 from Faolain/test/pr596-regression-tests
test(pubsub): add regression coverage for subscribe race edges
2 parents c6403d3 + 9e62689 commit 1929680

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import { getPublicKeyFromPeerId } from "@peerbit/crypto";
2+
import { TestSession } from "@peerbit/libp2p-test-utils";
3+
import { waitForNeighbour } from "@peerbit/stream";
4+
import { delay, waitForResolved } from "@peerbit/time";
5+
import { expect } from "chai";
6+
import { FanoutTree, TopicControlPlane, TopicRootControlPlane } from "../src/index.js";
7+
8+
describe("pubsub (subscribe race regressions)", function () {
9+
const createDisconnectedSession = async (
10+
peerCount: number,
11+
options?: {
12+
pubsub?: Partial<ConstructorParameters<typeof TopicControlPlane>[1]>;
13+
},
14+
) => {
15+
const topicRootControlPlane = new TopicRootControlPlane();
16+
const fanoutByHash = new Map<string, FanoutTree>();
17+
const getOrCreateFanout = (c: any) => {
18+
const hash = getPublicKeyFromPeerId(c.peerId).hashcode();
19+
let fanout = fanoutByHash.get(hash);
20+
if (!fanout) {
21+
fanout = new FanoutTree(c, {
22+
connectionManager: false,
23+
topicRootControlPlane,
24+
});
25+
fanoutByHash.set(hash, fanout);
26+
}
27+
return fanout;
28+
};
29+
30+
return TestSession.disconnected<{
31+
pubsub: TopicControlPlane;
32+
fanout: FanoutTree;
33+
}>(peerCount, {
34+
services: {
35+
fanout: (c: any) => getOrCreateFanout(c),
36+
pubsub: (c: any) =>
37+
new TopicControlPlane(c, {
38+
canRelayMessage: true,
39+
connectionManager: false,
40+
topicRootControlPlane,
41+
fanout: getOrCreateFanout(c),
42+
shardCount: 16,
43+
fanoutJoin: {
44+
timeoutMs: 10_000,
45+
retryMs: 50,
46+
bootstrapEnsureIntervalMs: 200,
47+
trackerQueryIntervalMs: 200,
48+
joinReqTimeoutMs: 1_000,
49+
trackerQueryTimeoutMs: 1_000,
50+
},
51+
...(options?.pubsub || {}),
52+
}),
53+
},
54+
});
55+
};
56+
57+
it("discovers peers when subscribe and connect happen concurrently", async () => {
58+
const TOPIC = "concurrent-subscribe-connect-regression";
59+
const session = await createDisconnectedSession(2);
60+
61+
try {
62+
const a = session.peers[0]!.services.pubsub;
63+
const b = session.peers[1]!.services.pubsub;
64+
65+
await Promise.all([
66+
a.subscribe(TOPIC),
67+
b.subscribe(TOPIC),
68+
session.connect([[session.peers[0], session.peers[1]]]),
69+
]);
70+
71+
await waitForResolved(() => {
72+
const aTopics = a.topics.get(TOPIC);
73+
const bTopics = b.topics.get(TOPIC);
74+
expect(aTopics).to.not.equal(undefined);
75+
expect(bTopics).to.not.equal(undefined);
76+
expect(aTopics?.has(b.publicKeyHash)).to.equal(true);
77+
expect(bTopics?.has(a.publicKeyHash)).to.equal(true);
78+
});
79+
} finally {
80+
await session.stop();
81+
}
82+
});
83+
84+
it("does not track a topic on a peer that never subscribed", async () => {
85+
const TOPIC = "non-subscriber-should-not-track-regression";
86+
const session = await createDisconnectedSession(2);
87+
88+
try {
89+
const a = session.peers[0]!.services.pubsub;
90+
const b = session.peers[1]!.services.pubsub;
91+
92+
await session.connect([[session.peers[0], session.peers[1]]]);
93+
await waitForNeighbour(a, b);
94+
95+
await b.subscribe(TOPIC);
96+
await waitForResolved(() => {
97+
expect(b.subscriptions.has(TOPIC)).to.equal(true);
98+
const bSubscribers = b.getSubscribers(TOPIC);
99+
expect(
100+
bSubscribers?.some((subscriber) => subscriber.hashcode() === b.publicKeyHash),
101+
).to.equal(true);
102+
});
103+
104+
expect(a.topics.has(TOPIC)).to.equal(false);
105+
expect(a.topics.get(TOPIC)).to.equal(undefined);
106+
} finally {
107+
await session.stop();
108+
}
109+
});
110+
111+
it("does not advertise cancelled pending subscriptions to peers", async () => {
112+
const TOPIC = "subscribe-then-unsubscribe-before-debounce-regression";
113+
const debounceDelayMs = 500;
114+
const session = await createDisconnectedSession(2, {
115+
pubsub: {
116+
subscriptionDebounceDelay: debounceDelayMs,
117+
},
118+
});
119+
120+
try {
121+
const a = session.peers[0]!.services.pubsub;
122+
const b = session.peers[1]!.services.pubsub;
123+
124+
await session.connect([[session.peers[0], session.peers[1]]]);
125+
await waitForNeighbour(a, b);
126+
127+
const pendingSubscribe = a.subscribe(TOPIC);
128+
const removed = await a.unsubscribe(TOPIC);
129+
expect(removed).to.equal(false);
130+
131+
await b.subscribe(TOPIC);
132+
133+
// Wait for A's debounced subscribe cycle to settle before asserting.
134+
// This validates that A does not get (stale) advertised at flush time.
135+
await pendingSubscribe;
136+
await delay(debounceDelayMs + 100);
137+
138+
expect(a.topics.has(TOPIC)).to.equal(false);
139+
const bTopics = b.topics.get(TOPIC);
140+
expect(bTopics).to.not.equal(undefined);
141+
expect(bTopics!.has(a.publicKeyHash)).to.equal(false);
142+
} finally {
143+
await session.stop();
144+
}
145+
});
146+
});

0 commit comments

Comments
 (0)