diff --git a/pkg/commands/psubscribe.test.ts b/pkg/commands/psubscribe.test.ts index 7b1bbd38..bf09e606 100644 --- a/pkg/commands/psubscribe.test.ts +++ b/pkg/commands/psubscribe.test.ts @@ -23,7 +23,7 @@ describe("Pattern Subscriber", () => { receivedMessages.push(message); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; const testMessage: TestMessage = { user: "testUser", @@ -52,7 +52,7 @@ describe("Pattern Subscriber", () => { messages.push(data); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; await redis.publish("chat:room1:messages", { msg: "Hello Room 1" }); await redis.publish("chat:room2:messages", { msg: "Hello Room 2" }); @@ -81,7 +81,7 @@ describe("Pattern Subscriber", () => { messages[pattern].push({ channel, message }); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; await redis.publish("user:123", { type: "user" }); await redis.publish("chat:room1", { type: "chat" }); @@ -108,7 +108,7 @@ describe("Pattern Subscriber", () => { messages[pattern].push({ channel, message }); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; // Initial messages await redis.publish("user:123", { msg: "user1" }); @@ -153,7 +153,8 @@ describe("Pattern Subscriber", () => { channelMessages.push(message); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await patternSubscriber.ready; + await channelSubscriber.ready; const testMessage: TestMessage = { msg: "Hello" }; await redis.publish("user:123", testMessage); diff --git a/pkg/commands/subscribe.test.ts b/pkg/commands/subscribe.test.ts index 6443ece4..ee5ef7bc 100644 --- a/pkg/commands/subscribe.test.ts +++ b/pkg/commands/subscribe.test.ts @@ -16,7 +16,7 @@ describe("Subscriber", () => { }); // Wait for subscription to establish - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; const testMessage = { user: "testUser", @@ -42,7 +42,7 @@ describe("Subscriber", () => { receivedMessages.push(data.message); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; const messages = [ { user: "user1", message: "First", timestamp: Date.now() }, @@ -72,7 +72,7 @@ describe("Subscriber", () => { channelMessages.push(data.message); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; const testMessage = { user: "testUser", @@ -100,7 +100,8 @@ describe("Subscriber", () => { subscriber1.on("message", (data) => messages1.push(data.message)); subscriber2.on("message", (data) => messages2.push(data.message)); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber1.ready; + await subscriber2.ready; const testMessage = { user: "testUser", @@ -131,7 +132,7 @@ describe("Subscriber", () => { messages[data.channel].push(data.message); }); - await new Promise((resolve) => setTimeout(resolve, 500)); + await subscriber.ready; // Send initial messages to both channels await redis.publish("channel1", { test: "before1" }); diff --git a/pkg/commands/subscribe.ts b/pkg/commands/subscribe.ts index 2886ec4a..f9f953df 100644 --- a/pkg/commands/subscribe.ts +++ b/pkg/commands/subscribe.ts @@ -40,6 +40,7 @@ export class Subscriber extends EventTarget { private subscriptions: Map; private client: Requester; private listeners: Map>>; + public ready: Promise; constructor(client: Requester, channels: string[], isPattern: boolean = false) { super(); @@ -47,13 +48,9 @@ export class Subscriber extends EventTarget { this.subscriptions = new Map(); this.listeners = new Map(); - for (const channel of channels) { - if (isPattern) { - this.subscribeToPattern(channel); - } else { - this.subscribeToChannel(channel); - } - } + this.ready = isPattern + ? Promise.all(channels.map((c) => this.subscribeToPattern(c))).then(() => {}) + : Promise.all(channels.map((c) => this.subscribeToChannel(c))).then(() => {}); } private subscribeToChannel(channel: string) { @@ -66,7 +63,7 @@ export class Subscriber extends EventTarget { }, }); - command.exec(this.client).catch((error) => { + const commandPromise = command.exec(this.client).catch((error) => { if (error.name !== "AbortError") { this.dispatchToListeners("error", error); } @@ -77,6 +74,8 @@ export class Subscriber extends EventTarget { controller, isPattern: false, }); + + return commandPromise; } private subscribeToPattern(pattern: string) { @@ -89,7 +88,7 @@ export class Subscriber extends EventTarget { }, }); - command.exec(this.client).catch((error) => { + const commandPromise = command.exec(this.client).catch((error) => { if (error.name !== "AbortError") { this.dispatchToListeners("error", error); } @@ -100,6 +99,8 @@ export class Subscriber extends EventTarget { controller, isPattern: true, }); + + return commandPromise; } private handleMessage(data: string, isPattern: boolean) {