Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/transport/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,11 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
this.lastSubscriptionMessages.delete(publicKey.hashcode());

if (changed.length > 0) {
const event = new UnsubcriptionEvent(publicKey, changed);
(event as any).reason = "peer-unreachable";
this.dispatchEvent(
new CustomEvent<UnsubcriptionEvent>("unsubscribe", {
detail: new UnsubcriptionEvent(publicKey, changed),
detail: event,
}),
);
}
Expand Down Expand Up @@ -771,9 +773,11 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
}

if (changed.length > 0 && seenBefore === 0) {
const event = new UnsubcriptionEvent(sender, changed);
(event as any).reason = "remote-unsubscribe";
this.dispatchEvent(
new CustomEvent<UnsubcriptionEvent>("unsubscribe", {
detail: new UnsubcriptionEvent(sender, changed),
detail: event,
}),
);
}
Expand Down
177 changes: 177 additions & 0 deletions packages/transport/pubsub/test/bug2-unsubscribe-reason.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* BUG 2: UnsubcriptionEvent lacks .reason property
*
* The upstream UnsubcriptionEvent is dispatched in two different scenarios:
* 1. Peer becomes unreachable (removeSubscriptions → "peer-unreachable")
* 2. Peer explicitly unsubscribes (Unsubscribe message → "remote-unsubscribe")
*
* However, the event.detail has NO .reason field, making it impossible for
* consumers to distinguish WHY an unsubscription happened. This is important
* for connection management UX (e.g., showing "peer went offline" vs
* "peer left the topic").
*
* Fix: set event.reason = "peer-unreachable" | "remote-unsubscribe" on the
* UnsubcriptionEvent before dispatching.
*/
import { TestSession } from "@peerbit/libp2p-test-utils";
import type { UnsubcriptionEvent } from "@peerbit/pubsub-interface";
import { waitForNeighbour } from "@peerbit/stream";
import { delay, waitFor, waitForResolved } from "@peerbit/time";

Check warning on line 19 in packages/transport/pubsub/test/bug2-unsubscribe-reason.spec.ts

View workflow job for this annotation

GitHub Actions / Test (pnpm run test:ci:part-1, Node v22.x, OS ubuntu-22.04)

'waitFor' is defined but never used. Allowed unused vars must match /^_/u
import { expect } from "chai";
import { DirectSub, waitForSubscribers } from "../src/index.js";

describe("BUG 2: UnsubcriptionEvent missing .reason property", function () {
this.timeout(60_000);

describe("peer-unreachable (removeSubscriptions path)", () => {
let session: TestSession<{ pubsub: DirectSub }>;

beforeEach(async () => {
session = await TestSession.disconnected(2, {
services: {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: false,
}),
},
});
});

afterEach(async () => {
await session.stop();
});

it("should include reason='peer-unreachable' when a peer disconnects", async () => {
const TOPIC = "reason-test-unreachable";
const unsubEvents: (UnsubcriptionEvent & { reason?: string })[] = [];

// Both peers subscribe
await session.peers[0].services.pubsub.subscribe(TOPIC);
await session.peers[1].services.pubsub.subscribe(TOPIC);

// Connect
await session.connect([
[session.peers[0], session.peers[1]],
]);

await waitForSubscribers(
session.peers[0],
[session.peers[1]],
TOPIC,
);
await waitForSubscribers(
session.peers[1],
[session.peers[0]],
TOPIC,
);

// Listen for unsubscribe events on peer 1
session.peers[1].services.pubsub.addEventListener(
"unsubscribe",
(e) => {
unsubEvents.push(e.detail as any);
},
);

// Stop peer 0 – this triggers removeSubscriptions on peer 1
await delay(2000);
await session.peers[0].stop();

// Wait for unsubscribe event
await waitForResolved(
() => expect(unsubEvents).to.have.length.greaterThanOrEqual(1),
{ timeout: 30_000 },
);

// BUG: Without the fix, event.reason is undefined
const event = unsubEvents[0];
expect(event.from.equals(session.peers[0].services.pubsub.publicKey))
.to.be.true;
expect(event.topics).to.include(TOPIC);

// This assertion FAILS without the patch
expect((event as any).reason).to.equal("peer-unreachable");
});
});

describe("remote-unsubscribe (Unsubscribe message path)", () => {
let session: TestSession<{ pubsub: DirectSub }>;

beforeEach(async () => {
session = await TestSession.disconnected(3, {
services: {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: false,
}),
},
});
});

afterEach(async () => {
await session.stop();
});

it("should include reason='remote-unsubscribe' when a peer explicitly unsubscribes", async () => {
const TOPIC = "reason-test-unsubscribe";
const unsubEvents: (UnsubcriptionEvent & { reason?: string })[] = [];

// Connect first
await session.connect([
[session.peers[0], session.peers[1]],
[session.peers[1], session.peers[2]],
]);

await waitForNeighbour(
session.peers[0].services.pubsub,
session.peers[1].services.pubsub,
);

// Peer 1 needs to track subscriptions
await session.peers[1].services.pubsub.requestSubscribers(TOPIC);

// Peer 0 subscribes
await session.peers[0].services.pubsub.subscribe(TOPIC);

// Wait for peer 1 to learn about peer 0's subscription
await waitForResolved(
() =>
expect(
session.peers[1].services.pubsub.topics
.get(TOPIC)
?.has(session.peers[0].services.pubsub.publicKeyHash),
).to.be.true,
);

// Listen for unsubscribe events on peer 1
session.peers[1].services.pubsub.addEventListener(
"unsubscribe",
(e) => {
unsubEvents.push(e.detail as any);
},
);

// Allow debouncing to settle
await delay(3000);

// Peer 0 explicitly unsubscribes (sends Unsubscribe message)
await session.peers[0].services.pubsub.unsubscribe(TOPIC);

// Wait for unsubscribe event on peer 1
await waitForResolved(
() => expect(unsubEvents).to.have.length.greaterThanOrEqual(1),
{ timeout: 15_000 },
);

const event = unsubEvents[0];
expect(event.from.equals(session.peers[0].services.pubsub.publicKey))
.to.be.true;
expect(event.topics).to.include(TOPIC);

// This assertion FAILS without the patch
expect((event as any).reason).to.equal("remote-unsubscribe");
});
});
});
Loading