Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 146 additions & 13 deletions packages/core/src/transport/websocket/index.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import { WebSocketTransport } from ".";

const WEBSOCKET_URL = "ws://localhost:8000/connection/websocket";

const testModes = [
{ name: "Shared Centrifuge Client", useSharedConnection: true },
{ name: "Single Centrifuge Client", useSharedConnection: false },
];

/**
* Simple in-memory KV store implementation for testing.
*/
Expand Down Expand Up @@ -34,13 +39,14 @@ const waitFor = (emitter: Emitter, event: string): Promise<any> => {
return new Promise((resolve) => emitter.once(event, resolve));
};

t.describe("WebSocketTransport", () => {
t.describe.each(testModes)("WebSocketTransport with $name", ({ useSharedConnection }) => {
t.describe("Constructor and Initialization", () => {
t.test("should create an instance of WebSocketTransport", async () => {
const transport = await WebSocketTransport.create({
url: WEBSOCKET_URL,
kvstore: new InMemoryKVStore(),
websocket: WebSocket,
useSharedConnection,
});
t.expect(transport).toBeInstanceOf(WebSocketTransport);
});
Expand All @@ -50,6 +56,7 @@ t.describe("WebSocketTransport", () => {
kvstore: new InMemoryKVStore(),
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
t.expect((transport as any).state).toBe("disconnected");
});
Expand All @@ -65,6 +72,7 @@ t.describe("WebSocketTransport", () => {
kvstore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
});

Expand Down Expand Up @@ -140,6 +148,7 @@ t.describe("WebSocketTransport", () => {
kvstore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await transport.connect();
});
Expand All @@ -166,8 +175,10 @@ t.describe("WebSocketTransport", () => {
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await publisher.connect();
await publisher.subscribe(channel); // Publisher needs to subscribe before publishing

const payload = `message from publisher ${Date.now()}`;
const messagePromise = waitFor(transport, "message");
Expand All @@ -182,6 +193,58 @@ t.describe("WebSocketTransport", () => {

await publisher.disconnect();
});

t.test("should not attach duplicate event listeners on repeated subscribe calls", async () => {
await transport.subscribe(channel);

// Set up a counter to track how many times the message handler is called
let messageCount = 0;
transport.on("message", () => {
messageCount++;
});

// Subscribe to the same channel again (should not attach new listeners)
await transport.subscribe(channel);

// Publish a single message using the publisher
const publisherKVStore = new InMemoryKVStore();
const publisher = await WebSocketTransport.create({
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await publisher.connect();
await publisher.subscribe(channel);

const payload = `test-message-${Date.now()}`;
const messagePromise = waitFor(transport, "message");

await publisher.publish(channel, payload);
await messagePromise;

// Wait a bit to ensure no duplicate messages arrive
await new Promise((resolve) => setTimeout(resolve, 200));

// Should only receive the message once, not twice
t.expect(messageCount).toBe(1);

await publisher.disconnect();
});

t.test("should not call _fetchHistory multiple times on repeated subscribe", async () => {
const fetchHistorySpy = t.vi.spyOn(transport as any, "_fetchHistory");

// First subscribe - should call _fetchHistory
await transport.subscribe(channel);
t.expect(fetchHistorySpy).toHaveBeenCalledTimes(1);

// Second subscribe to same channel - should NOT call _fetchHistory again
await transport.subscribe(channel);
t.expect(fetchHistorySpy).toHaveBeenCalledTimes(1); // Still 1, not 2

fetchHistorySpy.mockRestore();
});
});

t.describe("Message Publishing and Queuing", () => {
Expand All @@ -202,11 +265,13 @@ t.describe("WebSocketTransport", () => {
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
subscriber = await WebSocketTransport.create({
kvstore: subscriberKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});

// Subscriber must be connected and subscribed to receive messages
Expand All @@ -226,8 +291,9 @@ t.describe("WebSocketTransport", () => {
// Publish while disconnected, the promise should be pending
const publishPromise = publisher.publish(channel, payload);

// Now connect the publisher
// Now connect the publisher and subscribe to the channel
await publisher.connect();
await publisher.subscribe(channel);

// The promise should now resolve with true, and the message should be received
await t.expect(publishPromise).resolves.toBe(true);
Expand All @@ -241,12 +307,14 @@ t.describe("WebSocketTransport", () => {

// Start connecting the publisher
const connectPromise = publisher.connect();
t.expect((publisher as any).state).toBe("connecting");
// With SharedCentrifuge, if connection is already established, it may jump straight to "connected"
t.expect(["connecting", "connected"]).toContain((publisher as any).state);

// Publish while the connection is in progress
// Publish while the connection is in progress (or immediately after if already connected)
const publishPromise = publisher.publish(channel, payload);

await connectPromise;
await publisher.subscribe(channel); // Publisher needs to subscribe before publishing

await t.expect(publishPromise).resolves.toBe(true);
const received = await messagePromise;
Expand All @@ -273,6 +341,7 @@ t.describe("WebSocketTransport", () => {

// Connect to trigger sending the queue
await publisher.connect();
await publisher.subscribe(channel);
await Promise.all(publishPromises); // Wait for all publish promises to resolve

await messagesReceivedPromise; // Wait for all messages to be received
Expand All @@ -282,6 +351,7 @@ t.describe("WebSocketTransport", () => {

t.test("should send a message immediately when connected", async () => {
await publisher.connect();
await publisher.subscribe(channel);
const payload = "instant-message";
const messagePromise = waitFor(subscriber, "message");

Expand Down Expand Up @@ -320,11 +390,13 @@ t.describe("WebSocketTransport", () => {
kvstore: subscriberKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
rawPublisher = await WebSocketTransport.create({
kvstore: rawPublisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});

transports.push(subscriber, rawPublisher);
Expand Down Expand Up @@ -385,6 +457,7 @@ t.describe("WebSocketTransport", () => {
kvstore: subscriberKVStore, // Same storage
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(newSubscriber);

Expand Down Expand Up @@ -431,6 +504,58 @@ t.describe("WebSocketTransport", () => {
t.expect(error.message).toContain("Failed to parse incoming message");
});

t.test("should fetch history for new WebSocketTransport instances after shared subscription disconnects", async () => {
const channel = `session:${uuid()}`;
const kvstoreA = new InMemoryKVStore();
const kvstoreB = new InMemoryKVStore();

// Create first transport and publish some messages
const transportA = await WebSocketTransport.create({
kvstore: kvstoreA,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await transportA.connect();
await transportA.subscribe(channel);

const payloads = ["history-1", "history-2", "history-3"];
for (const payload of payloads) {
await transportA.publish(channel, payload);
}

// Wait a bit for messages to be published
await new Promise((resolve) => setTimeout(resolve, 100));

// Disconnect the first transport to ensure messages are in history
await transportA.disconnect();

// Create second transport with different kvstore (simulating different wallet instance)
const transportB = await WebSocketTransport.create({
kvstore: kvstoreB,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});

// Collect messages received by the second transport
const receivedMessages: string[] = [];
transportB.on("message", ({ data }) => {
receivedMessages.push(data);
});

await transportB.connect();
await transportB.subscribe(channel); // This should fetch history

// Wait for history to be fetched
await new Promise((resolve) => setTimeout(resolve, 200));

// Should have received the historical messages
t.expect(receivedMessages).toEqual(payloads);

await transportB.disconnect();
});

t.test("should handle multiple transport instances with independent per-channel nonces", async () => {
// Create two different channels to simulate wallet connecting to multiple dApps
const channelA = `session:${uuid()}`;
Expand All @@ -441,6 +566,7 @@ t.describe("WebSocketTransport", () => {
kvstore: rawPublisherKVStore, // Same kvstore as rawPublisher
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(publisher2);
await publisher2.connect();
Expand All @@ -451,6 +577,7 @@ t.describe("WebSocketTransport", () => {
kvstore: subscriber2KVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(subscriber2);
await subscriber2.connect();
Expand Down Expand Up @@ -491,6 +618,7 @@ t.describe("WebSocketTransport", () => {
kvstore: historicalPublisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(historicalPublisher);
await historicalPublisher.connect();
Expand All @@ -507,6 +635,7 @@ t.describe("WebSocketTransport", () => {
kvstore: subscriberKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(subscriber);

Expand Down Expand Up @@ -537,6 +666,7 @@ t.describe("WebSocketTransport", () => {
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
transports.push(publisher);
await publisher.connect();
Expand Down Expand Up @@ -568,6 +698,7 @@ t.describe("WebSocketTransport", () => {
kvstore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await transport.connect();
});
Expand All @@ -585,7 +716,7 @@ t.describe("WebSocketTransport", () => {
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify subscription exists
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();

// Verify storage has data (check storage directly)
const storage = (transport as any).storage;
Expand All @@ -599,7 +730,7 @@ t.describe("WebSocketTransport", () => {
await transport.clear(channel);

// Verify subscription is removed
t.expect((transport as any).centrifuge.getSubscription(channel)).toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channel)).toBeFalsy();

// Verify storage is cleared
t.expect(await kvstore.get(nonceKey)).toBeNull();
Expand All @@ -615,15 +746,15 @@ t.describe("WebSocketTransport", () => {
await transport.subscribe(channelB);

// Verify both subscriptions exist
t.expect((transport as any).centrifuge.getSubscription(channelA)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channelA)).not.toBeUndefined();
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeUndefined();

// Clear only channel A
await transport.clear(channelA);

// Verify only channel A subscription is removed
t.expect((transport as any).centrifuge.getSubscription(channelA)).toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channelA)).toBeFalsy();
t.expect((transport as any).centrifuge.getSubscription(channelB)).not.toBeUndefined();
});

t.test("should handle clearing non-subscribed channel gracefully", async () => {
Expand All @@ -634,10 +765,10 @@ t.describe("WebSocketTransport", () => {

// Verify it doesn't affect existing subscriptions
await transport.subscribe(channel);
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();

await transport.clear(nonSubscribedChannel);
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();
});

t.test("should clear channel with existing message history", async () => {
Expand All @@ -647,6 +778,7 @@ t.describe("WebSocketTransport", () => {
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await publisher.connect();

Expand Down Expand Up @@ -684,14 +816,15 @@ t.describe("WebSocketTransport", () => {

// This should work without issues
await t.expect(transport.subscribe(channel)).resolves.toBeUndefined();
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeNull();
t.expect((transport as any).centrifuge.getSubscription(channel)).not.toBeUndefined();

// Should be able to receive messages on the resubscribed channel
const publisherKVStore = new InMemoryKVStore();
const publisher = await WebSocketTransport.create({
kvstore: publisherKVStore,
url: WEBSOCKET_URL,
websocket: WebSocket,
useSharedConnection,
});
await publisher.connect();

Expand Down
Loading