Skip to content

Commit 391d8dc

Browse files
authored
fix(): event emitting (medusajs#14196)
* fix(): event emitting * Create rude-queens-deny.md * fix(): store subscriber should not be constraint * Update rude-queens-deny.md * Add tests to prevent regression
1 parent fe3c284 commit 391d8dc

File tree

6 files changed

+151
-8
lines changed

6 files changed

+151
-8
lines changed

.changeset/rude-queens-deny.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/medusa": patch
3+
"@medusajs/utils": patch
4+
---
5+
6+
fix(): event emitting
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
const { defineConfig, Modules } = require("@medusajs/utils")
2+
const os = require("os")
3+
const path = require("path")
4+
5+
const DB_HOST = process.env.DB_HOST
6+
const DB_USERNAME = process.env.DB_USERNAME
7+
const DB_PASSWORD = process.env.DB_PASSWORD
8+
const DB_NAME = process.env.DB_TEMP_NAME
9+
const DB_URL = `postgres://${DB_USERNAME}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}`
10+
process.env.DATABASE_URL = DB_URL
11+
process.env.LOG_LEVEL = "error"
12+
13+
module.exports = defineConfig({
14+
admin: {
15+
disable: true,
16+
},
17+
projectConfig: {
18+
http: {
19+
jwtSecret: "test",
20+
},
21+
workerMode: "server",
22+
redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379",
23+
},
24+
modules: {
25+
[Modules.EVENT_BUS]: {
26+
resolve: "@medusajs/event-bus-redis",
27+
options: {
28+
redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379",
29+
},
30+
},
31+
[Modules.FILE]: {
32+
resolve: "@medusajs/file",
33+
options: {
34+
providers: [
35+
{
36+
resolve: "@medusajs/file-local",
37+
id: "local",
38+
options: {
39+
upload_dir: path.join(os.tmpdir(), "uploads"),
40+
private_upload_dir: path.join(os.tmpdir(), "static"),
41+
},
42+
},
43+
],
44+
},
45+
},
46+
},
47+
})
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { medusaIntegrationTestRunner } from "@medusajs/test-utils"
2+
import { IEventBusModuleService } from "@medusajs/types"
3+
import { composeMessage, Modules, PaymentWebhookEvents } from "@medusajs/utils"
4+
import path from "path"
5+
6+
jest.setTimeout(100000)
7+
8+
medusaIntegrationTestRunner({
9+
medusaConfigFile: path.join(
10+
__dirname,
11+
"../../__fixtures__/worker-mode-server"
12+
),
13+
testSuite: ({ getContainer }) => {
14+
describe("Event Bus - Server Worker Mode", () => {
15+
let eventBus: IEventBusModuleService
16+
17+
beforeAll(() => {
18+
eventBus = getContainer().resolve(Modules.EVENT_BUS)
19+
})
20+
21+
it("should register subscribers, queue events with subscribers, and skip events without subscribers", async () => {
22+
const subscribersMap = (eventBus as any).eventToSubscribersMap
23+
expect(subscribersMap).toBeDefined()
24+
expect(subscribersMap.size).toBeGreaterThan(0)
25+
26+
const paymentWebhookSubscribers = subscribersMap.get(
27+
PaymentWebhookEvents.WebhookReceived
28+
)
29+
expect(paymentWebhookSubscribers).toBeDefined()
30+
expect(paymentWebhookSubscribers.length).toBeGreaterThan(0)
31+
32+
const bullWorker = (eventBus as any).bullWorker_
33+
expect(bullWorker).toBeUndefined()
34+
35+
const testEventName = "test.server-mode-event"
36+
const subscriberMock = jest.fn()
37+
38+
eventBus.subscribe(testEventName, subscriberMock, {
39+
subscriberId: "test-server-mode-subscriber",
40+
})
41+
expect(subscribersMap.get(testEventName)).toBeDefined()
42+
43+
const queue = (eventBus as any).queue_
44+
const jobCountsBefore = await queue.getJobCounts()
45+
const totalJobsBefore =
46+
jobCountsBefore.waiting + jobCountsBefore.delayed
47+
48+
await eventBus.emit(
49+
composeMessage(testEventName, {
50+
data: { test: "data" },
51+
object: "test",
52+
source: "integration-test",
53+
action: "created",
54+
})
55+
)
56+
57+
const jobCountsAfterWithSubscriber = await queue.getJobCounts()
58+
const totalJobsAfterWithSubscriber =
59+
jobCountsAfterWithSubscriber.waiting +
60+
jobCountsAfterWithSubscriber.delayed
61+
62+
expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore)
63+
64+
await new Promise((resolve) => setTimeout(resolve, 500))
65+
66+
expect(subscriberMock).not.toHaveBeenCalled()
67+
68+
const eventWithNoSubscribers = "test.event-without-subscribers"
69+
expect(subscribersMap.get(eventWithNoSubscribers)).toBeUndefined()
70+
71+
const jobCountsBeforeNoSub = await queue.getJobCounts()
72+
const totalJobsBeforeNoSub =
73+
jobCountsBeforeNoSub.waiting + jobCountsBeforeNoSub.delayed
74+
75+
await eventBus.emit(
76+
composeMessage(eventWithNoSubscribers, {
77+
data: { test: "should-not-be-queued" },
78+
object: "test",
79+
source: "integration-test",
80+
action: "created",
81+
})
82+
)
83+
84+
const jobCountsAfterNoSub = await queue.getJobCounts()
85+
const totalJobsAfterNoSub =
86+
jobCountsAfterNoSub.waiting + jobCountsAfterNoSub.delayed
87+
88+
expect(totalJobsAfterNoSub).toBe(totalJobsBeforeNoSub)
89+
})
90+
})
91+
},
92+
})

integration-tests/http/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"@medusajs/core-flows": "workspace:*",
1717
"@medusajs/customer": "workspace:^",
1818
"@medusajs/event-bus-local": "workspace:*",
19+
"@medusajs/event-bus-redis": "workspace:*",
1920
"@medusajs/framework": "workspace:*",
2021
"@medusajs/fulfillment": "workspace:^",
2122
"@medusajs/fulfillment-manual": "workspace:^",

packages/core/utils/src/event-bus/index.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,6 @@ export abstract class AbstractEventBusModuleService
8787
subscriber: EventBusTypes.Subscriber,
8888
context?: EventBusTypes.SubscriberContext
8989
): this {
90-
if (!this.isWorkerMode) {
91-
return this
92-
}
93-
9490
if (typeof subscriber !== `function`) {
9591
throw new Error("Subscriber must be a function")
9692
}

packages/medusa/src/loaders/index.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,12 @@ async function loadEntrypoints(
9191
ContainerRegistrationKeys.CONFIG_MODULE
9292
)
9393

94+
// Subscribers should be loaded no matter the worker mode, simply they will never handle anything
95+
// since worker/shared instances only will have a running worker to process events.
96+
await subscribersLoader(plugins, container)
97+
9498
if (shouldLoadBackgroundProcessors(configModule)) {
95-
await promiseAll([
96-
subscribersLoader(plugins, container),
97-
jobsLoader(plugins, container),
98-
])
99+
await jobsLoader(plugins, container)
99100
}
100101

101102
if (isWorkerMode(configModule)) {

0 commit comments

Comments
 (0)