Skip to content

Commit 113f200

Browse files
authored
chore(event-bus-*): Do not emit if no subscribers: (medusajs#14084)
* chore(event-vus-*): Do not emit if no subscribers: * Create curly-apricots-double.md * add tests * align tests
1 parent 6c3ec52 commit 113f200

File tree

6 files changed

+373
-23
lines changed

6 files changed

+373
-23
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/event-bus-local": patch
3+
"@medusajs/event-bus-redis": patch
4+
---
5+
6+
chore(event-vus-*): Do not emit if no subscribers:

integration-tests/modules/__tests__/cart/store/cart.completion.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ medusaIntegrationTestRunner({
10211021
eventGroupId
10221022
) ?? []
10231023

1024-
expect(grouppedEventBefore).toHaveLength(1)
1024+
expect(grouppedEventBefore).toHaveLength(17)
10251025
expect(grouppedEventAfter).toHaveLength(0) // events have been compensated
10261026

10271027
expect(errors[0].error.message).toBe(

packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,21 @@ describe("LocalEventBusService", () => {
2626
})
2727

2828
it("should emit an event", async () => {
29+
eventBus.subscribe("eventName", () => Promise.resolve())
30+
2931
eventEmitter.emit = jest.fn((data) => data)
32+
eventEmitter.listenerCount = jest.fn((event) =>
33+
event === "eventName" ? 1 : 0
34+
)
3035

3136
await eventBus.emit({
3237
name: "eventName",
3338
data: { hi: "1234" },
3439
})
3540

41+
// Wait for async emission to complete
42+
await new Promise((resolve) => setImmediate(resolve))
43+
3644
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
3745
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
3846
data: { hi: "1234" },
@@ -41,12 +49,17 @@ describe("LocalEventBusService", () => {
4149

4250
expect(loggerMock.info).toHaveBeenCalledTimes(1)
4351
expect(loggerMock.info).toHaveBeenCalledWith(
44-
"Processing eventName which has undefined subscribers"
52+
"Processing eventName which has 1 subscribers"
4553
)
4654
})
4755

4856
it("should emit an event but not log anything if it is internal", async () => {
57+
eventBus.subscribe("eventName", () => Promise.resolve())
58+
4959
eventEmitter.emit = jest.fn((data) => data)
60+
eventEmitter.listenerCount = jest.fn((event) =>
61+
event === "eventName" ? 1 : 0
62+
)
5063

5164
await eventBus.emit({
5265
name: "eventName",
@@ -56,6 +69,9 @@ describe("LocalEventBusService", () => {
5669
},
5770
})
5871

72+
// Wait for async emission to complete
73+
await new Promise((resolve) => setImmediate(resolve))
74+
5975
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
6076
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
6177
data: { hi: "1234" },
@@ -74,6 +90,9 @@ describe("LocalEventBusService", () => {
7490
}
7591
)
7692

93+
// Wait for async emission to complete
94+
await new Promise((resolve) => setImmediate(resolve))
95+
7796
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
7897
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
7998
data: { hi: "1234" },
@@ -84,13 +103,22 @@ describe("LocalEventBusService", () => {
84103
})
85104

86105
it("should emit multiple events", async () => {
106+
eventBus.subscribe("event-1", () => Promise.resolve())
107+
eventBus.subscribe("event-2", () => Promise.resolve())
108+
87109
eventEmitter.emit = jest.fn((data) => data)
110+
eventEmitter.listenerCount = jest.fn((event) =>
111+
event === "event-1" || event === "event-2" ? 1 : 0
112+
)
88113

89114
await eventBus.emit([
90115
{ name: "event-1", data: { hi: "1234" } },
91116
{ name: "event-2", data: { hi: "5678" } },
92117
])
93118

119+
// Wait for async emission to complete
120+
await new Promise((resolve) => setImmediate(resolve))
121+
94122
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
95123
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
96124
data: { hi: "1234" },
@@ -161,7 +189,13 @@ describe("LocalEventBusService", () => {
161189
})
162190

163191
it("should release events when requested with eventGroupId", async () => {
192+
eventBus.subscribe("event-1", () => Promise.resolve())
193+
eventBus.subscribe("event-2", () => Promise.resolve())
194+
164195
eventEmitter.emit = jest.fn((data) => data)
196+
eventEmitter.listenerCount = jest.fn((event) =>
197+
event === "event-1" || event === "event-2" ? 1 : 0
198+
)
165199

166200
await eventBus.emit([
167201
{
@@ -187,6 +221,9 @@ describe("LocalEventBusService", () => {
187221
{ name: "event-1", data: { test: "1" } },
188222
])
189223

224+
// Wait for async emission to complete
225+
await new Promise((resolve) => setImmediate(resolve))
226+
190227
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
191228
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
192229
data: { test: "1" },
@@ -204,6 +241,9 @@ describe("LocalEventBusService", () => {
204241
eventEmitter.emit = jest.fn((data) => data)
205242
await eventBus.releaseGroupedEvents("group-1")
206243

244+
// Wait for async emission to complete
245+
await new Promise((resolve) => setImmediate(resolve))
246+
207247
expect(
208248
(eventBus as any).groupedEventsMap_.get("group-1")
209249
).not.toBeDefined()
@@ -254,5 +294,139 @@ describe("LocalEventBusService", () => {
254294
expect(getMap().get("group-2")).not.toBeDefined()
255295
})
256296
})
297+
298+
describe("Events without subscribers", () => {
299+
beforeEach(() => {
300+
jest.clearAllMocks()
301+
302+
eventBus = new LocalEventBusService(moduleDeps as any, {}, {} as any)
303+
eventEmitter = (eventBus as any).eventEmitter_
304+
})
305+
306+
it("should not emit events when there are no subscribers", async () => {
307+
eventEmitter.emit = jest.fn((data) => data)
308+
eventEmitter.listenerCount = jest.fn(() => 0)
309+
310+
await eventBus.emit({
311+
name: "eventWithoutSubscribers",
312+
data: { test: "data" },
313+
})
314+
315+
expect(eventEmitter.emit).not.toHaveBeenCalled()
316+
})
317+
318+
it("should still call interceptors even when there are no subscribers", async () => {
319+
const callInterceptorsSpy = jest.spyOn(
320+
eventBus as any,
321+
"callInterceptors"
322+
)
323+
324+
eventEmitter.emit = jest.fn((data) => data)
325+
eventEmitter.listenerCount = jest.fn(() => 0)
326+
327+
await eventBus.emit({
328+
name: "eventWithoutSubscribers",
329+
data: { test: "data" },
330+
})
331+
332+
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
333+
expect(callInterceptorsSpy).toHaveBeenCalledWith(
334+
expect.objectContaining({
335+
name: "eventWithoutSubscribers",
336+
data: { test: "data" },
337+
}),
338+
{ isGrouped: false }
339+
)
340+
341+
expect(eventEmitter.emit).not.toHaveBeenCalled()
342+
343+
callInterceptorsSpy.mockRestore()
344+
})
345+
346+
it("should emit events with wildcard subscriber", async () => {
347+
eventBus.subscribe("*", () => Promise.resolve())
348+
349+
eventEmitter.emit = jest.fn((data) => data)
350+
eventEmitter.listenerCount = jest.fn((event) => (event === "*" ? 1 : 0))
351+
352+
await eventBus.emit({
353+
name: "anyEvent",
354+
data: { test: "data" },
355+
})
356+
357+
// Wait for async emission to complete
358+
await new Promise((resolve) => setImmediate(resolve))
359+
360+
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
361+
expect(eventEmitter.emit).toHaveBeenCalledWith("*", {
362+
data: { test: "data" },
363+
name: "anyEvent",
364+
})
365+
})
366+
367+
it("should not emit grouped events when releasing if there are no subscribers", async () => {
368+
eventEmitter.emit = jest.fn((data) => data)
369+
eventEmitter.listenerCount = jest.fn(() => 0)
370+
371+
await eventBus.emit({
372+
name: "grouped-event-no-sub",
373+
data: { hi: "1234" },
374+
metadata: { eventGroupId: "test-group-no-sub" },
375+
})
376+
377+
expect(
378+
(eventBus as any).groupedEventsMap_.get("test-group-no-sub")
379+
).toHaveLength(1)
380+
expect(eventEmitter.emit).not.toHaveBeenCalled()
381+
382+
jest.clearAllMocks()
383+
eventEmitter.emit = jest.fn((data) => data)
384+
385+
await eventBus.releaseGroupedEvents("test-group-no-sub")
386+
387+
expect(eventEmitter.emit).not.toHaveBeenCalled()
388+
389+
expect(
390+
(eventBus as any).groupedEventsMap_.get("test-group-no-sub")
391+
).not.toBeDefined()
392+
})
393+
394+
it("should still call interceptors for grouped events without subscribers", async () => {
395+
eventEmitter.emit = jest.fn((data) => data)
396+
eventEmitter.listenerCount = jest.fn(() => 0)
397+
398+
await eventBus.emit({
399+
name: "grouped-event-no-sub-2",
400+
data: { hi: "1234" },
401+
metadata: { eventGroupId: "test-group-no-sub-2" },
402+
})
403+
404+
expect(
405+
(eventBus as any).groupedEventsMap_.get("test-group-no-sub-2")
406+
).toHaveLength(1)
407+
408+
jest.clearAllMocks()
409+
eventEmitter.emit = jest.fn((data) => data)
410+
411+
const callInterceptorsSpy = jest.spyOn(
412+
eventBus as any,
413+
"callInterceptors"
414+
)
415+
416+
await eventBus.releaseGroupedEvents("test-group-no-sub-2")
417+
418+
expect(callInterceptorsSpy).toHaveBeenCalledTimes(1)
419+
expect(callInterceptorsSpy).toHaveBeenCalledWith(
420+
expect.objectContaining({
421+
name: "grouped-event-no-sub-2",
422+
}),
423+
{ isGrouped: true, eventGroupId: "test-group-no-sub-2" }
424+
)
425+
426+
expect(eventEmitter.emit).not.toHaveBeenCalled()
427+
428+
callInterceptorsSpy.mockRestore()
429+
})
430+
})
257431
})
258432
})

packages/modules/event-bus-local/src/services/event-bus-local.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,6 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
5858
const eventListenersCount = this.eventEmitter_.listenerCount(
5959
eventData.name
6060
)
61-
const startSubscribersCount = this.eventEmitter_.listenerCount("*")
62-
63-
if (eventListenersCount === 0 && startSubscribersCount === 0) {
64-
continue
65-
}
6661

6762
if (!options.internal && !eventData.options?.internal) {
6863
this.logger_?.info(
@@ -93,11 +88,18 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
9388
const options_ = eventData.options as { delay: number }
9489
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
9590

91+
const eventListenersCount = this.eventEmitter_.listenerCount(
92+
eventData.name
93+
)
94+
9695
delay(options_?.delay).then(async () => {
9796
// Call interceptors before emitting
9897
void this.callInterceptors(eventData, { isGrouped: false })
9998

100-
this.eventEmitter_.emit(eventData.name, eventBody)
99+
if (eventListenersCount) {
100+
this.eventEmitter_.emit(eventData.name, eventBody)
101+
}
102+
101103
if (hasStarSubscriber) {
102104
this.eventEmitter_.emit("*", eventBody)
103105
}
@@ -120,19 +122,25 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
120122
async releaseGroupedEvents(eventGroupId: string) {
121123
let groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
122124
groupedEvents = JSON.parse(JSON.stringify(groupedEvents))
125+
123126
const hasStarSubscriber = this.eventEmitter_.listenerCount("*") > 0
124127

125128
for (const event of groupedEvents) {
126129
const { options, ...eventBody } = event
127130

131+
const eventListenersCount = this.eventEmitter_.listenerCount(event.name)
132+
128133
const options_ = options as { delay: number }
129134
const delay = (ms?: number) => (ms ? setTimeout(ms) : Promise.resolve())
130135

131136
delay(options_?.delay).then(async () => {
132137
// Call interceptors before emitting grouped events
133138
void this.callInterceptors(event, { isGrouped: true, eventGroupId })
134139

135-
this.eventEmitter_.emit(event.name, eventBody)
140+
if (eventListenersCount) {
141+
this.eventEmitter_.emit(event.name, eventBody)
142+
}
143+
136144
if (hasStarSubscriber) {
137145
this.eventEmitter_.emit("*", eventBody)
138146
}

0 commit comments

Comments
 (0)