Skip to content

Commit 7af58cd

Browse files
authored
Fix rapid reconnect issue for shared clients (#55)
1 parent bfb50c3 commit 7af58cd

File tree

7 files changed

+413
-22
lines changed

7 files changed

+413
-22
lines changed

apps/web-demo/src/components/MetaMaskMobileDemo.tsx

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import { LocalStorageKVStore } from "@/lib/localStorage-kvstore";
99

1010
// const RELAY_URL = "ws://localhost:8000/connection/websocket";
1111
const RELAY_URL = "wss://mm-sdk-relay.api.cx.metamask.io/connection/websocket";
12-
const HARDCODED_ETH_ACCOUNT = "0x2e404cdebe05098c066f9844aa990722749ed100";
13-
const HARDCODED_SOL_ACCOUNT = "8VMXFL3MN9z1Eg3WNBk4SECk3cKPn7hr7mUQqjwaYPev";
12+
const HARDCODED_ETH_ACCOUNT = "0x3984fd31734648921f9455c71c8a78fa711312bd";
13+
const HARDCODED_SOL_ACCOUNT = "A2k25kuXLKtcorM1C8BvbQYohUrADLqRobBQARY6CtX3";
1414

1515
type LogEntry = {
1616
id: string;
@@ -116,7 +116,7 @@ export default function MetaMaskMobileDemo() {
116116
// Start new connection, which will trigger 'session-request' and start a new timer
117117
await dappClientRef.current.connect({
118118
mode: "trusted",
119-
initialPayload: createSessionRequest
119+
initialPayload: createSessionRequest,
120120
});
121121
} catch (error) {
122122
onError(error instanceof Error ? error : new Error("Unknown error"));
@@ -613,14 +613,15 @@ export default function MetaMaskMobileDemo() {
613613
{dappLogs.map((log) => (
614614
<div
615615
key={log.id}
616-
className={`p-2 rounded text-xs ${log.type === "sent"
617-
? "bg-blue-100 dark:bg-blue-900"
618-
: log.type === "received"
619-
? "bg-green-100 dark:bg-green-900"
620-
: log.type === "notification"
621-
? "bg-yellow-100 dark:bg-yellow-900"
622-
: "bg-gray-200 dark:bg-gray-700"
623-
}`}
616+
className={`p-2 rounded text-xs ${
617+
log.type === "sent"
618+
? "bg-blue-100 dark:bg-blue-900"
619+
: log.type === "received"
620+
? "bg-green-100 dark:bg-green-900"
621+
: log.type === "notification"
622+
? "bg-yellow-100 dark:bg-yellow-900"
623+
: "bg-gray-200 dark:bg-gray-700"
624+
}`}
624625
>
625626
<div className="flex justify-between items-start mb-1">
626627
<span className="font-medium uppercase">{log.type}</span>

apps/web-demo/src/components/Navigation.tsx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ export default function Navigation() {
2828
<Link
2929
key={demo.href}
3030
href={demo.href}
31-
className={`px-3 py-2 rounded-md text-sm font-medium transition-colors ${isActive
31+
className={`px-3 py-2 rounded-md text-sm font-medium transition-colors ${
32+
isActive
3233
? "bg-blue-100 dark:bg-blue-900 text-blue-700 dark:text-blue-200"
3334
: "text-gray-600 dark:text-gray-300 hover:text-gray-900 dark:hover:text-white hover:bg-gray-50 dark:hover:bg-gray-700"
34-
}`}
35+
}`}
3536
>
3637
{demo.name}
3738
</Link>

apps/web-demo/src/lib/encoding-utils.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
import * as pako from 'pako';
1+
import * as pako from "pako";
22

33
/**
44
* Cross-platform base64 encoding
55
* Works in browser, Node.js, and React Native environments
66
*/
77
export function base64Encode(str: string): string {
8-
if (typeof btoa !== 'undefined') {
8+
if (typeof btoa !== "undefined") {
99
// Browser and React Native with polyfills
1010
return btoa(str);
11-
} else if (typeof Buffer !== 'undefined') {
11+
} else if (typeof Buffer !== "undefined") {
1212
// Node.js
13-
return Buffer.from(str).toString('base64');
13+
return Buffer.from(str).toString("base64");
1414
} else {
15-
throw new Error('No base64 encoding method available');
15+
throw new Error("No base64 encoding method available");
1616
}
1717
}
1818

@@ -21,14 +21,14 @@ export function base64Encode(str: string): string {
2121
* Works in browser, Node.js, and React Native environments
2222
*/
2323
export function base64Decode(str: string): string {
24-
if (typeof atob !== 'undefined') {
24+
if (typeof atob !== "undefined") {
2525
// Browser and React Native with polyfills
2626
return atob(str);
27-
} else if (typeof Buffer !== 'undefined') {
27+
} else if (typeof Buffer !== "undefined") {
2828
// Node.js
29-
return Buffer.from(str, 'base64').toString();
29+
return Buffer.from(str, "base64").toString();
3030
} else {
31-
throw new Error('No base64 decoding method available');
31+
throw new Error("No base64 decoding method available");
3232
}
3333
}
3434

packages/core/src/transport/websocket/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
144144
* existing subscription objects in memory, allowing for automatic recovery.
145145
*/
146146
public reconnect(): Promise<void> {
147+
// Check if we are using the SharedCentrifuge and if it has our smart reconnect method.
148+
if (this.centrifuge instanceof SharedCentrifuge && "reconnect" in this.centrifuge) {
149+
return this.centrifuge.reconnect();
150+
}
151+
152+
// Fallback to the original behavior for non-shared connections.
147153
if (this.state === "connecting") {
148154
return new Promise((resolve) => this.centrifuge.once("connected", () => resolve()));
149155
}

packages/core/src/transport/websocket/shared-centrifuge.integration.test.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,4 +373,126 @@ t.describe("SharedCentrifuge Integration Tests", () => {
373373
t.expect(context?.subscriptions.get(channel)).toBeUndefined();
374374
t.expect(context?.centrifuge.getSubscription(channel)).toBeNull();
375375
});
376+
377+
t.test("should handle multiple simultaneous reconnect calls from different instances without connection storms", async () => {
378+
const channel = `session:${uuid()}`;
379+
const numClients = 5;
380+
const clients: SharedCentrifuge[] = [];
381+
382+
// Create multiple clients (simulating multiple wallet connections)
383+
for (let i = 0; i < numClients; i++) {
384+
const client = new SharedCentrifuge(WEBSOCKET_URL, { websocket: WebSocket });
385+
clients.push(client);
386+
instances.push(client);
387+
}
388+
389+
// Connect all clients
390+
const connectPromises = clients.map((client) => {
391+
const promise = waitFor(client, "connected");
392+
client.connect();
393+
return promise;
394+
});
395+
await Promise.all(connectPromises);
396+
397+
// Subscribe all clients to the same channel
398+
const subscriptions = clients.map((client) => {
399+
const sub = client.newSubscription(channel);
400+
sub.subscribe();
401+
return sub;
402+
});
403+
await Promise.all(subscriptions.map((sub) => new Promise((resolve) => sub.once("subscribed", resolve))));
404+
405+
// Verify all clients are connected and subscribed
406+
clients.forEach((client) => {
407+
t.expect(client.state).toBe("connected");
408+
});
409+
410+
// Access the shared context to verify there's only ONE underlying connection
411+
// @ts-expect-error - accessing private property for test
412+
const context = SharedCentrifuge.contexts.get(WEBSOCKET_URL);
413+
t.expect(context).toBeDefined();
414+
t.expect(context?.refcount).toBe(numClients);
415+
416+
// Test multiple reconnect cycles (simulating app going to background/foreground repeatedly)
417+
for (let cycle = 0; cycle < 3; cycle++) {
418+
// Call reconnect on ALL clients simultaneously (this is where the bug would manifest)
419+
const reconnectPromises = clients.map((client) => client.reconnect());
420+
421+
// All reconnects should succeed and return the same promise (idempotent behavior)
422+
await Promise.all(reconnectPromises);
423+
424+
// Verify all clients are still connected
425+
clients.forEach((client) => {
426+
t.expect(client.state).toBe("connected");
427+
});
428+
429+
// Verify messages can still be sent and received after reconnect
430+
const messagePromise = new Promise((resolve) => {
431+
subscriptions[0].once("publication", (ctx) => resolve(ctx.data));
432+
});
433+
434+
const testPayload = { test: `message-after-reconnect-cycle-${cycle}` };
435+
await clients[0].publish(channel, JSON.stringify(testPayload));
436+
437+
const received = await messagePromise;
438+
t.expect(JSON.parse(received as string)).toEqual(testPayload);
439+
440+
// Wait a bit between cycles (simulating time between app suspensions)
441+
await new Promise((resolve) => setTimeout(resolve, 100));
442+
}
443+
444+
// Final verification: send one more message to ensure everything still works
445+
const finalMessagePromise = new Promise((resolve) => {
446+
subscriptions[numClients - 1].once("publication", (ctx) => resolve(ctx.data));
447+
});
448+
449+
const finalPayload = { test: "final-message-after-all-reconnects" };
450+
await clients[numClients - 1].publish(channel, JSON.stringify(finalPayload));
451+
452+
const finalReceived = await finalMessagePromise;
453+
t.expect(JSON.parse(finalReceived as string)).toEqual(finalPayload);
454+
});
455+
456+
t.test("should handle rapid successive reconnects without causing race conditions", async () => {
457+
const channel = `session:${uuid()}`;
458+
const client1 = new SharedCentrifuge(WEBSOCKET_URL, { websocket: WebSocket });
459+
const client2 = new SharedCentrifuge(WEBSOCKET_URL, { websocket: WebSocket });
460+
instances.push(client1, client2);
461+
462+
// Connect both clients
463+
const connectedPromise1 = waitFor(client1, "connected");
464+
const connectedPromise2 = waitFor(client2, "connected");
465+
client1.connect();
466+
await Promise.all([connectedPromise1, connectedPromise2]);
467+
468+
// Subscribe both to the same channel
469+
const sub1 = client1.newSubscription(channel);
470+
const sub2 = client2.newSubscription(channel);
471+
sub1.subscribe();
472+
sub2.subscribe();
473+
await Promise.all([new Promise((resolve) => sub1.once("subscribed", resolve)), new Promise((resolve) => sub2.once("subscribed", resolve))]);
474+
475+
// Fire off many reconnects in rapid succession from both clients
476+
const rapidReconnects: Promise<void>[] = [];
477+
for (let i = 0; i < 10; i++) {
478+
rapidReconnects.push(client1.reconnect());
479+
rapidReconnects.push(client2.reconnect());
480+
}
481+
482+
// All should complete successfully
483+
await Promise.all(rapidReconnects);
484+
485+
// Verify both clients are still connected
486+
t.expect(client1.state).toBe("connected");
487+
t.expect(client2.state).toBe("connected");
488+
489+
// Verify messaging still works
490+
const messagePromise = new Promise((resolve) => {
491+
sub2.once("publication", (ctx) => resolve(ctx.data));
492+
});
493+
494+
await client1.publish(channel, JSON.stringify({ test: "after-rapid-reconnects" }));
495+
const received = await messagePromise;
496+
t.expect(JSON.parse(received as string)).toEqual({ test: "after-rapid-reconnects" });
497+
});
376498
});

0 commit comments

Comments
 (0)