Skip to content

Commit 6afb38b

Browse files
authored
Feat: Shared ws connection (#49)
1 parent 3467883 commit 6afb38b

File tree

4 files changed

+925
-35
lines changed

4 files changed

+925
-35
lines changed

packages/core/src/transport/websocket/index.integration.test.ts

Lines changed: 146 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import { WebSocketTransport } from ".";
77

88
const WEBSOCKET_URL = "ws://localhost:8000/connection/websocket";
99

10+
const testModes = [
11+
{ name: "Shared Centrifuge Client", useSharedConnection: true },
12+
{ name: "Single Centrifuge Client", useSharedConnection: false },
13+
];
14+
1015
/**
1116
* Simple in-memory KV store implementation for testing.
1217
*/
@@ -34,13 +39,14 @@ const waitFor = (emitter: Emitter, event: string): Promise<any> => {
3439
return new Promise((resolve) => emitter.once(event, resolve));
3540
};
3641

37-
t.describe("WebSocketTransport", () => {
42+
t.describe.each(testModes)("WebSocketTransport with $name", ({ useSharedConnection }) => {
3843
t.describe("Constructor and Initialization", () => {
3944
t.test("should create an instance of WebSocketTransport", async () => {
4045
const transport = await WebSocketTransport.create({
4146
url: WEBSOCKET_URL,
4247
kvstore: new InMemoryKVStore(),
4348
websocket: WebSocket,
49+
useSharedConnection,
4450
});
4551
t.expect(transport).toBeInstanceOf(WebSocketTransport);
4652
});
@@ -50,6 +56,7 @@ t.describe("WebSocketTransport", () => {
5056
kvstore: new InMemoryKVStore(),
5157
url: WEBSOCKET_URL,
5258
websocket: WebSocket,
59+
useSharedConnection,
5360
});
5461
t.expect((transport as any).state).toBe("disconnected");
5562
});
@@ -65,6 +72,7 @@ t.describe("WebSocketTransport", () => {
6572
kvstore,
6673
url: WEBSOCKET_URL,
6774
websocket: WebSocket,
75+
useSharedConnection,
6876
});
6977
});
7078

@@ -140,6 +148,7 @@ t.describe("WebSocketTransport", () => {
140148
kvstore,
141149
url: WEBSOCKET_URL,
142150
websocket: WebSocket,
151+
useSharedConnection,
143152
});
144153
await transport.connect();
145154
});
@@ -166,8 +175,10 @@ t.describe("WebSocketTransport", () => {
166175
kvstore: publisherKVStore,
167176
url: WEBSOCKET_URL,
168177
websocket: WebSocket,
178+
useSharedConnection,
169179
});
170180
await publisher.connect();
181+
await publisher.subscribe(channel); // Publisher needs to subscribe before publishing
171182

172183
const payload = `message from publisher ${Date.now()}`;
173184
const messagePromise = waitFor(transport, "message");
@@ -182,6 +193,58 @@ t.describe("WebSocketTransport", () => {
182193

183194
await publisher.disconnect();
184195
});
196+
197+
t.test("should not attach duplicate event listeners on repeated subscribe calls", async () => {
198+
await transport.subscribe(channel);
199+
200+
// Set up a counter to track how many times the message handler is called
201+
let messageCount = 0;
202+
transport.on("message", () => {
203+
messageCount++;
204+
});
205+
206+
// Subscribe to the same channel again (should not attach new listeners)
207+
await transport.subscribe(channel);
208+
209+
// Publish a single message using the publisher
210+
const publisherKVStore = new InMemoryKVStore();
211+
const publisher = await WebSocketTransport.create({
212+
kvstore: publisherKVStore,
213+
url: WEBSOCKET_URL,
214+
websocket: WebSocket,
215+
useSharedConnection,
216+
});
217+
await publisher.connect();
218+
await publisher.subscribe(channel);
219+
220+
const payload = `test-message-${Date.now()}`;
221+
const messagePromise = waitFor(transport, "message");
222+
223+
await publisher.publish(channel, payload);
224+
await messagePromise;
225+
226+
// Wait a bit to ensure no duplicate messages arrive
227+
await new Promise((resolve) => setTimeout(resolve, 200));
228+
229+
// Should only receive the message once, not twice
230+
t.expect(messageCount).toBe(1);
231+
232+
await publisher.disconnect();
233+
});
234+
235+
t.test("should not call _fetchHistory multiple times on repeated subscribe", async () => {
236+
const fetchHistorySpy = t.vi.spyOn(transport as any, "_fetchHistory");
237+
238+
// First subscribe - should call _fetchHistory
239+
await transport.subscribe(channel);
240+
t.expect(fetchHistorySpy).toHaveBeenCalledTimes(1);
241+
242+
// Second subscribe to same channel - should NOT call _fetchHistory again
243+
await transport.subscribe(channel);
244+
t.expect(fetchHistorySpy).toHaveBeenCalledTimes(1); // Still 1, not 2
245+
246+
fetchHistorySpy.mockRestore();
247+
});
185248
});
186249

187250
t.describe("Message Publishing and Queuing", () => {
@@ -202,11 +265,13 @@ t.describe("WebSocketTransport", () => {
202265
kvstore: publisherKVStore,
203266
url: WEBSOCKET_URL,
204267
websocket: WebSocket,
268+
useSharedConnection,
205269
});
206270
subscriber = await WebSocketTransport.create({
207271
kvstore: subscriberKVStore,
208272
url: WEBSOCKET_URL,
209273
websocket: WebSocket,
274+
useSharedConnection,
210275
});
211276

212277
// Subscriber must be connected and subscribed to receive messages
@@ -226,8 +291,9 @@ t.describe("WebSocketTransport", () => {
226291
// Publish while disconnected, the promise should be pending
227292
const publishPromise = publisher.publish(channel, payload);
228293

229-
// Now connect the publisher
294+
// Now connect the publisher and subscribe to the channel
230295
await publisher.connect();
296+
await publisher.subscribe(channel);
231297

232298
// The promise should now resolve with true, and the message should be received
233299
await t.expect(publishPromise).resolves.toBe(true);
@@ -241,12 +307,14 @@ t.describe("WebSocketTransport", () => {
241307

242308
// Start connecting the publisher
243309
const connectPromise = publisher.connect();
244-
t.expect((publisher as any).state).toBe("connecting");
310+
// With SharedCentrifuge, if connection is already established, it may jump straight to "connected"
311+
t.expect(["connecting", "connected"]).toContain((publisher as any).state);
245312

246-
// Publish while the connection is in progress
313+
// Publish while the connection is in progress (or immediately after if already connected)
247314
const publishPromise = publisher.publish(channel, payload);
248315

249316
await connectPromise;
317+
await publisher.subscribe(channel); // Publisher needs to subscribe before publishing
250318

251319
await t.expect(publishPromise).resolves.toBe(true);
252320
const received = await messagePromise;
@@ -273,6 +341,7 @@ t.describe("WebSocketTransport", () => {
273341

274342
// Connect to trigger sending the queue
275343
await publisher.connect();
344+
await publisher.subscribe(channel);
276345
await Promise.all(publishPromises); // Wait for all publish promises to resolve
277346

278347
await messagesReceivedPromise; // Wait for all messages to be received
@@ -282,6 +351,7 @@ t.describe("WebSocketTransport", () => {
282351

283352
t.test("should send a message immediately when connected", async () => {
284353
await publisher.connect();
354+
await publisher.subscribe(channel);
285355
const payload = "instant-message";
286356
const messagePromise = waitFor(subscriber, "message");
287357

@@ -320,11 +390,13 @@ t.describe("WebSocketTransport", () => {
320390
kvstore: subscriberKVStore,
321391
url: WEBSOCKET_URL,
322392
websocket: WebSocket,
393+
useSharedConnection,
323394
});
324395
rawPublisher = await WebSocketTransport.create({
325396
kvstore: rawPublisherKVStore,
326397
url: WEBSOCKET_URL,
327398
websocket: WebSocket,
399+
useSharedConnection,
328400
});
329401

330402
transports.push(subscriber, rawPublisher);
@@ -385,6 +457,7 @@ t.describe("WebSocketTransport", () => {
385457
kvstore: subscriberKVStore, // Same storage
386458
url: WEBSOCKET_URL,
387459
websocket: WebSocket,
460+
useSharedConnection,
388461
});
389462
transports.push(newSubscriber);
390463

@@ -431,6 +504,58 @@ t.describe("WebSocketTransport", () => {
431504
t.expect(error.message).toContain("Failed to parse incoming message");
432505
});
433506

507+
t.test("should fetch history for new WebSocketTransport instances after shared subscription disconnects", async () => {
508+
const channel = `session:${uuid()}`;
509+
const kvstoreA = new InMemoryKVStore();
510+
const kvstoreB = new InMemoryKVStore();
511+
512+
// Create first transport and publish some messages
513+
const transportA = await WebSocketTransport.create({
514+
kvstore: kvstoreA,
515+
url: WEBSOCKET_URL,
516+
websocket: WebSocket,
517+
useSharedConnection,
518+
});
519+
await transportA.connect();
520+
await transportA.subscribe(channel);
521+
522+
const payloads = ["history-1", "history-2", "history-3"];
523+
for (const payload of payloads) {
524+
await transportA.publish(channel, payload);
525+
}
526+
527+
// Wait a bit for messages to be published
528+
await new Promise((resolve) => setTimeout(resolve, 100));
529+
530+
// Disconnect the first transport to ensure messages are in history
531+
await transportA.disconnect();
532+
533+
// Create second transport with different kvstore (simulating different wallet instance)
534+
const transportB = await WebSocketTransport.create({
535+
kvstore: kvstoreB,
536+
url: WEBSOCKET_URL,
537+
websocket: WebSocket,
538+
useSharedConnection,
539+
});
540+
541+
// Collect messages received by the second transport
542+
const receivedMessages: string[] = [];
543+
transportB.on("message", ({ data }) => {
544+
receivedMessages.push(data);
545+
});
546+
547+
await transportB.connect();
548+
await transportB.subscribe(channel); // This should fetch history
549+
550+
// Wait for history to be fetched
551+
await new Promise((resolve) => setTimeout(resolve, 200));
552+
553+
// Should have received the historical messages
554+
t.expect(receivedMessages).toEqual(payloads);
555+
556+
await transportB.disconnect();
557+
});
558+
434559
t.test("should handle multiple transport instances with independent per-channel nonces", async () => {
435560
// Create two different channels to simulate wallet connecting to multiple dApps
436561
const channelA = `session:${uuid()}`;
@@ -441,6 +566,7 @@ t.describe("WebSocketTransport", () => {
441566
kvstore: rawPublisherKVStore, // Same kvstore as rawPublisher
442567
url: WEBSOCKET_URL,
443568
websocket: WebSocket,
569+
useSharedConnection,
444570
});
445571
transports.push(publisher2);
446572
await publisher2.connect();
@@ -451,6 +577,7 @@ t.describe("WebSocketTransport", () => {
451577
kvstore: subscriber2KVStore,
452578
url: WEBSOCKET_URL,
453579
websocket: WebSocket,
580+
useSharedConnection,
454581
});
455582
transports.push(subscriber2);
456583
await subscriber2.connect();
@@ -491,6 +618,7 @@ t.describe("WebSocketTransport", () => {
491618
kvstore: historicalPublisherKVStore,
492619
url: WEBSOCKET_URL,
493620
websocket: WebSocket,
621+
useSharedConnection,
494622
});
495623
transports.push(historicalPublisher);
496624
await historicalPublisher.connect();
@@ -507,6 +635,7 @@ t.describe("WebSocketTransport", () => {
507635
kvstore: subscriberKVStore,
508636
url: WEBSOCKET_URL,
509637
websocket: WebSocket,
638+
useSharedConnection,
510639
});
511640
transports.push(subscriber);
512641

@@ -537,6 +666,7 @@ t.describe("WebSocketTransport", () => {
537666
kvstore: publisherKVStore,
538667
url: WEBSOCKET_URL,
539668
websocket: WebSocket,
669+
useSharedConnection,
540670
});
541671
transports.push(publisher);
542672
await publisher.connect();
@@ -568,6 +698,7 @@ t.describe("WebSocketTransport", () => {
568698
kvstore,
569699
url: WEBSOCKET_URL,
570700
websocket: WebSocket,
701+
useSharedConnection,
571702
});
572703
await transport.connect();
573704
});
@@ -585,7 +716,7 @@ t.describe("WebSocketTransport", () => {
585716
await new Promise((resolve) => setTimeout(resolve, 100));
586717

587718
// Verify subscription exists
588-
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
719+
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();
589720

590721
// Verify storage has data (check storage directly)
591722
const storage = (transport as any).storage;
@@ -599,7 +730,7 @@ t.describe("WebSocketTransport", () => {
599730
await transport.clear(channel);
600731

601732
// Verify subscription is removed
602-
t.expect((transport as any).centrifuge.getSubscription(channel)).toBeNull();
733+
t.expect((transport as any).centrifuge.getSubscription(channel)).toBeFalsy();
603734

604735
// Verify storage is cleared
605736
t.expect(await kvstore.get(nonceKey)).toBeNull();
@@ -615,15 +746,15 @@ t.describe("WebSocketTransport", () => {
615746
await transport.subscribe(channelB);
616747

617748
// Verify both subscriptions exist
618-
t.expect((transport as any).centrifuge.getSubscription(channelA)).not.toBeNull();
619-
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeNull();
749+
t.expect((transport as any).centrifuge.getSubscription(channelA)).not.toBeUndefined();
750+
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeUndefined();
620751

621752
// Clear only channel A
622753
await transport.clear(channelA);
623754

624755
// Verify only channel A subscription is removed
625-
t.expect((transport as any).centrifuge.getSubscription(channelA)).toBeNull();
626-
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeNull();
756+
t.expect((transport as any).centrifuge.getSubscription(channelA)).toBeFalsy();
757+
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeUndefined();
627758
});
628759

629760
t.test("should handle clearing non-subscribed channel gracefully", async () => {
@@ -634,10 +765,10 @@ t.describe("WebSocketTransport", () => {
634765

635766
// Verify it doesn't affect existing subscriptions
636767
await transport.subscribe(channel);
637-
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
768+
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();
638769

639770
await transport.clear(nonSubscribedChannel);
640-
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
771+
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();
641772
});
642773

643774
t.test("should clear channel with existing message history", async () => {
@@ -647,6 +778,7 @@ t.describe("WebSocketTransport", () => {
647778
kvstore: publisherKVStore,
648779
url: WEBSOCKET_URL,
649780
websocket: WebSocket,
781+
useSharedConnection,
650782
});
651783
await publisher.connect();
652784

@@ -684,14 +816,15 @@ t.describe("WebSocketTransport", () => {
684816

685817
// This should work without issues
686818
await t.expect(transport.subscribe(channel)).resolves.toBeUndefined();
687-
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
819+
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();
688820

689821
// Should be able to receive messages on the resubscribed channel
690822
const publisherKVStore = new InMemoryKVStore();
691823
const publisher = await WebSocketTransport.create({
692824
kvstore: publisherKVStore,
693825
url: WEBSOCKET_URL,
694826
websocket: WebSocket,
827+
useSharedConnection,
695828
});
696829
await publisher.connect();
697830

0 commit comments

Comments
 (0)