Skip to content

Commit f82bc04

Browse files
committed
Remove listeners on stop
1 parent 0a8c5cd commit f82bc04

File tree

2 files changed

+174
-61
lines changed

2 files changed

+174
-61
lines changed

spec/MultiTabWorkerBroker.spec.ts

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,26 @@ describe("MultiTabWorkerBroker", () => {
99
let testCounter = 0;
1010
let getLockName: () => string;
1111

12+
// Helper function to wait for a condition with retry logic
13+
async function waitFor<T>(
14+
fn: () => T | Promise<T>,
15+
predicate: (value: T) => boolean,
16+
options: { timeout?: number; interval?: number; message?: string } = {}
17+
): Promise<T> {
18+
const { timeout = 5000, interval = 10, message = "Condition not met within timeout" } = options;
19+
const startTime = Date.now();
20+
21+
while (Date.now() - startTime < timeout) {
22+
const value = await fn();
23+
if (predicate(value)) {
24+
return value;
25+
}
26+
await new Promise((resolve) => setTimeout(resolve, interval));
27+
}
28+
29+
throw new Error(message);
30+
}
31+
1232
beforeEach(() => {
1333
// Create unique lock name for each test
1434
const lockName = `test-lock-${testCounter++}`;
@@ -215,8 +235,14 @@ describe("MultiTabWorkerBroker", () => {
215235

216236
// Wait for follower to become leader and initialize its worker
217237
await broker2LeaderPromise;
218-
// Give a bit more time for worker to be fully ready
219-
await new Promise((resolve) => setTimeout(resolve, 50));
238+
// Wait for broker2 to be fully ready as leader
239+
await waitFor(
240+
() => broker2.isLeader,
241+
(isLeader) => isLeader === true,
242+
{ timeout: 500, message: "Broker2 did not become leader" }
243+
);
244+
// Give worker additional time to be fully ready
245+
await new Promise((resolve) => setTimeout(resolve, 100));
220246
expect(broker2.isLeader).toBe(true);
221247

222248
// Verify new leader can communicate
@@ -226,7 +252,13 @@ describe("MultiTabWorkerBroker", () => {
226252
method: "echo",
227253
params: { promoted: true },
228254
} as any);
229-
await new Promise((resolve) => setTimeout(resolve, 50));
255+
256+
// Wait for the response with retry logic
257+
await waitFor(
258+
() => followerMessages,
259+
(msgs) => msgs.some((m) => m.id === 1 && m.result?.promoted === true),
260+
{ timeout: 2000, message: "Expected response from promoted leader not received" }
261+
);
230262

231263
expect(followerMessages).toContainEqual(expect.objectContaining({ id: 1, result: { promoted: true } }));
232264

@@ -518,6 +550,52 @@ describe("MultiTabWorkerBroker", () => {
518550
await broker2.stop();
519551
});
520552

553+
it("should let a follower send messages after leader restarts within same tab", async () => {
554+
const lockName = getLockName();
555+
556+
// Tab A: start, stop (without awaiting), and start again (React Strict Mode pattern)
557+
const leader = new MultiTabWorkerBroker(lockName, makeWorker);
558+
await leader.start();
559+
const stopPromise = leader.stop();
560+
await leader.start();
561+
562+
expect(leader.isLeader).toBe(true);
563+
564+
// Tab B: start as follower
565+
const follower = new MultiTabWorkerBroker(lockName, makeWorker, { timeout: 50 });
566+
await follower.start();
567+
expect(follower.isLeader).toBe(false);
568+
569+
const followerMessages: any[] = [];
570+
const followerConnection = follower.createConnection();
571+
followerConnection.reader.listen((msg) => followerMessages.push(msg));
572+
573+
// First request from follower should succeed (currently fails with "Broker stopped" due to concurrent stop cleanup)
574+
const writePromise = followerConnection.writer.write({
575+
jsonrpc: "2.0",
576+
id: 1,
577+
method: "echo",
578+
params: { tab: "follower" },
579+
} as any);
580+
581+
try {
582+
await expect(writePromise).resolves.toBeUndefined();
583+
584+
// Wait for response delivery
585+
await waitFor(
586+
() => followerMessages,
587+
(msgs) => msgs.some((msg) => msg.id === 1 && msg.result?.tab === "follower"),
588+
{ timeout: 2000, message: "Follower did not receive response after leader restart" }
589+
);
590+
591+
expect(followerMessages).toContainEqual(expect.objectContaining({ id: 1, result: { tab: "follower" } }));
592+
} finally {
593+
await follower.stop();
594+
await leader.stop();
595+
await Promise.race([stopPromise, new Promise((resolve) => setTimeout(resolve, 1000))]);
596+
}
597+
});
598+
521599
it("should timeout when no leader available", async () => {
522600
const lockName = getLockName();
523601
// Use a shorter timeout (100ms) for faster test execution
@@ -1022,10 +1100,14 @@ describe("MultiTabWorkerBroker", () => {
10221100

10231101
const leader = new MultiTabWorkerBroker(lockName, async () => {
10241102
// Delay creating the worker to widen the window for queued requests
1025-
await new Promise((r) => setTimeout(r, 100));
1103+
await new Promise((r) => setTimeout(r, 200));
1104+
return await makeSlowWorker();
1105+
});
1106+
const follower = new MultiTabWorkerBroker(lockName, async () => {
1107+
// Also delay follower's worker creation to test queuing
1108+
await new Promise((r) => setTimeout(r, 200));
10261109
return await makeSlowWorker();
10271110
});
1028-
const follower = new MultiTabWorkerBroker(lockName, makeSlowWorker);
10291111

10301112
const followerMsgs: any[] = [];
10311113
const followerConn = follower.createConnection();
@@ -1042,15 +1124,28 @@ describe("MultiTabWorkerBroker", () => {
10421124
// Force leader to release so follower acquires lock and begins booting
10431125
const stopPromise = leader.stop();
10441126
// Small delay to ensure follower's lock wait proceeds
1045-
await new Promise((r) => setTimeout(r, 10));
1127+
await new Promise((r) => setTimeout(r, 20));
10461128
// Now, before new worker is ready, send a couple of requests
10471129
const p1 = followerConn.writer.write({ jsonrpc: "2.0", id: 1, method: "echo", params: { during: 1 } } as any);
10481130
const p2 = followerConn.writer.write({ jsonrpc: "2.0", id: 2, method: "echo", params: { during: 2 } } as any);
10491131
await stopPromise;
10501132

10511133
// Give time for follower to acquire lock and broker to create worker, then for queue to drain
10521134
await Promise.all([p1, p2]);
1053-
await new Promise((r) => setTimeout(r, 200));
1135+
1136+
// Wait for follower to become leader (needs more time due to 200ms worker delay)
1137+
await waitFor(
1138+
() => follower.isLeader,
1139+
(isLeader) => isLeader === true,
1140+
{ timeout: 3000, message: "Follower did not become leader" }
1141+
);
1142+
1143+
// Wait for both responses with retry logic (allow more time for queued messages to process)
1144+
await waitFor(
1145+
() => followerMsgs,
1146+
(msgs) => msgs.some((m) => m.id === 1 && m.result?.during === 1) && msgs.some((m) => m.id === 2 && m.result?.during === 2),
1147+
{ timeout: 3000, message: "Expected responses not received during leader boot" }
1148+
);
10541149

10551150
expect(follower.isLeader).toBe(true);
10561151
// Both responses should arrive even though they were sent while leader was booting

src/MultiTabWorkerBroker.ts

Lines changed: 72 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ export class MultiTabWorkerBrokerError extends Error {
2222
*/
2323
export class MultiTabWorkerBroker {
2424
isLeader = false;
25+
started = false;
2526

2627
private readonly brokerId: string;
2728
private broadcastChannel: BroadcastChannel | null = null;
2829
private worker: Worker | null = null;
2930
private lockAbortController = new AbortController();
3031
private pendingRequests = new Map<string, { resolve: (message: Message) => void; reject: (error: Error) => void }>();
3132
private nextRequestId = 0;
32-
private started = false;
3333
private timeout: number;
3434
private onStateChange?: (state: { isLeader: boolean }) => void;
3535
// Queue of worker requests received while leader is booting the worker
@@ -44,6 +44,8 @@ export class MultiTabWorkerBroker {
4444
private shouldDebug = false;
4545
// Track the active lock request promise to ensure proper cleanup
4646
private activeLockPromise: Promise<any> | null = null;
47+
// Track an in-flight stop operation so subsequent starts wait for cleanup
48+
private stopPromise: Promise<void> | null = null;
4749

4850
constructor(
4951
private readonly lockName: string,
@@ -104,6 +106,9 @@ export class MultiTabWorkerBroker {
104106

105107
/** Start the broker and attempt to acquire leadership */
106108
async start(): Promise<void> {
109+
if (this.stopPromise) {
110+
await this.stopPromise;
111+
}
107112
if (this.started) {
108113
return;
109114
}
@@ -114,7 +119,7 @@ export class MultiTabWorkerBroker {
114119

115120
// Set up broadcast channel for inter-tab communication
116121
this.broadcastChannel = new BroadcastChannel(this.lockName);
117-
this.broadcastChannel.addEventListener("message", this.handleBroadcastMessage.bind(this));
122+
this.broadcastChannel.addEventListener("message", this.handleBroadcastMessage);
118123

119124
// Try to acquire the lock and become leader - wait until we know our role
120125
await this.tryAcquireLock();
@@ -352,7 +357,12 @@ export class MultiTabWorkerBroker {
352357
this.emitErrorToAllConnections(error);
353358
}
354359

355-
private handleBroadcastMessage(event: MessageEvent): void {
360+
private handleBroadcastMessage = (event: MessageEvent): void => {
361+
// Ignore messages if we've been stopped
362+
if (!this.started) {
363+
return;
364+
}
365+
356366
const brokerMessage = event.data as BrokerMessage;
357367

358368
if (brokerMessage.type === "worker-message") {
@@ -389,7 +399,7 @@ export class MultiTabWorkerBroker {
389399
this.pendingRequests.delete(brokerMessage.id);
390400
}
391401
}
392-
}
402+
};
393403

394404
private async sendMessage(message: Message): Promise<void> {
395405
const originalId = (message as any)?.id;
@@ -459,73 +469,81 @@ export class MultiTabWorkerBroker {
459469

460470
/** Stop the broker and release all resources */
461471
async stop(): Promise<void> {
472+
if (this.stopPromise) {
473+
await this.stopPromise;
474+
return;
475+
}
476+
462477
if (!this.started) {
463478
return;
464479
}
465480

466481
this.started = false;
467482

468-
// Release the lock
469-
if (this.lockAbortController) {
470-
this.lockAbortController.abort();
471-
}
483+
const performStop = async () => {
484+
if (this.lockAbortController) {
485+
this.lockAbortController.abort();
486+
}
472487

473-
// Wait for the lock to be fully released before proceeding
474-
// This is critical to ensure a new broker can immediately acquire the same lock
475-
const lockPromiseToAwait = this.activeLockPromise;
476-
if (lockPromiseToAwait) {
477-
try {
478-
await lockPromiseToAwait;
479-
} catch (error) {
480-
// Ignore abort errors which are expected during stop
481-
if (error && (error as any).name !== "AbortError") {
482-
this.error("Error while waiting for lock release:", error);
488+
const lockPromiseToAwait = this.activeLockPromise;
489+
if (lockPromiseToAwait) {
490+
try {
491+
await lockPromiseToAwait;
492+
} catch (error) {
493+
if (error && (error as any).name !== "AbortError") {
494+
this.error("Error while waiting for lock release:", error as Error);
495+
}
483496
}
497+
this.activeLockPromise = null;
484498
}
485-
this.activeLockPromise = null;
486-
}
487499

488-
// Terminate worker if we're the leader
489-
if (this.worker) {
490-
this.worker.terminate();
491-
this.worker = null;
492-
}
500+
if (this.worker) {
501+
this.worker.terminate();
502+
this.worker = null;
503+
}
493504

494-
// Close broadcast channel
495-
if (this.broadcastChannel) {
496-
this.broadcastChannel.close();
497-
this.broadcastChannel = null;
498-
}
505+
const channel = this.broadcastChannel;
506+
if (channel) {
507+
channel.removeEventListener("message", this.handleBroadcastMessage);
508+
channel.close();
509+
this.broadcastChannel = null;
510+
}
499511

500-
// Reject all pending requests
501-
for (const [id, pending] of this.pendingRequests.entries()) {
502-
pending.reject(new Error("Broker stopped"));
503-
}
504-
this.pendingRequests.clear();
512+
for (const [id, pending] of this.pendingRequests.entries()) {
513+
pending.reject(new Error("Broker stopped"));
514+
}
515+
this.pendingRequests.clear();
505516

506-
// Emit close events to all connections
507-
this.emitCloseToAllConnections();
517+
this.emitCloseToAllConnections();
508518

509-
// Dispose all connections
510-
for (const { reader, writer } of this.connections.values()) {
511-
reader.dispose();
512-
writer.dispose();
513-
}
514-
this.connections.clear();
519+
for (const { reader, writer } of this.connections.values()) {
520+
reader.dispose();
521+
writer.dispose();
522+
}
523+
this.connections.clear();
524+
525+
this.workerReadyPromise = null;
526+
this.workerReadyResolver = null;
527+
this.leaderRequestQueue = [];
528+
this.rewrittenIdMap.clear();
529+
530+
const wasLeader = this.isLeader;
531+
this.isLeader = false;
515532

516-
// Reset worker state
517-
this.workerReadyPromise = null;
518-
this.workerReadyResolver = null;
519-
this.leaderRequestQueue = [];
520-
this.rewrittenIdMap.clear();
533+
if (wasLeader) {
534+
this.onStateChange?.({ isLeader: false });
535+
}
536+
};
521537

522-
const wasLeader = this.isLeader;
523-
this.isLeader = false;
538+
this.stopPromise = (async () => {
539+
try {
540+
await performStop();
541+
} finally {
542+
this.stopPromise = null;
543+
}
544+
})();
524545

525-
// Notify state change if we were the leader
526-
if (wasLeader) {
527-
this.onStateChange?.({ isLeader: false });
528-
}
546+
await this.stopPromise;
529547
}
530548
}
531549

0 commit comments

Comments
 (0)