Skip to content

Commit fc49253

Browse files
authored
feat(core, event-bus): Compensate emit event step utility (medusajs#13281)
* feat(core, event-bus): Compensate emit event step utility * tests * Update changeset to remove integration-tests-modules Removed integration-tests-modules from changeset. * revert test script
1 parent 73a25ab commit fc49253

File tree

11 files changed

+547
-22
lines changed

11 files changed

+547
-22
lines changed

.changeset/rotten-beds-boil.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@medusajs/event-bus-local": patch
3+
"@medusajs/event-bus-redis": patch
4+
"@medusajs/core-flows": patch
5+
"@medusajs/types": patch
6+
"@medusajs/utils": patch
7+
---
8+
9+
feat(core, event-bus): Compensate emit event step utility

integration-tests/modules/__tests__/cart/store/cart.completion.ts

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { medusaIntegrationTestRunner } from "@medusajs/test-utils"
1212
import {
1313
ICartModuleService,
1414
ICustomerModuleService,
15+
IEventBusModuleService,
1516
IFulfillmentModuleService,
1617
IInventoryService,
1718
IPaymentModuleService,
@@ -20,6 +21,7 @@ import {
2021
IRegionModuleService,
2122
ISalesChannelModuleService,
2223
IStockLocationService,
24+
Message,
2325
} from "@medusajs/types"
2426
import {
2527
ContainerRegistrationKeys,
@@ -60,6 +62,7 @@ medusaIntegrationTestRunner({
6062
let defaultRegion
6163
let customer, storeHeadersWithCustomer
6264
let setPricingContextHook: any
65+
let eventBus: IEventBusModuleService
6366

6467
beforeAll(async () => {
6568
appContainer = getContainer()
@@ -70,6 +73,7 @@ medusaIntegrationTestRunner({
7073
productModule = appContainer.resolve(Modules.PRODUCT)
7174
pricingModule = appContainer.resolve(Modules.PRICING)
7275
paymentModule = appContainer.resolve(Modules.PAYMENT)
76+
eventBus = appContainer.resolve(Modules.EVENT_BUS)
7377
fulfillmentModule = appContainer.resolve(Modules.FULFILLMENT)
7478
inventoryModule = appContainer.resolve(Modules.INVENTORY)
7579
stockLocationModule = appContainer.resolve(Modules.STOCK_LOCATION)
@@ -747,6 +751,7 @@ medusaIntegrationTestRunner({
747751
paymentSession.id
748752
)
749753
})
754+
750755
it("should complete cart when payment webhook and storefront are called in simultaneously", async () => {
751756
const salesChannel = await scModuleService.createSalesChannels({
752757
name: "Webshop",
@@ -876,6 +881,142 @@ medusaIntegrationTestRunner({
876881
expect(fullOrder.payment_collections[0].captured_amount).toBe(3000)
877882
expect(fullOrder.payment_collections[0].status).toBe("completed")
878883
})
884+
885+
it("should clear events when complete cart fails after emitting events", async () => {
886+
const salesChannel = await scModuleService.createSalesChannels({
887+
name: "Webshop",
888+
})
889+
890+
const location = await stockLocationModule.createStockLocations({
891+
name: "Warehouse",
892+
})
893+
894+
const region = await regionModuleService.createRegions({
895+
name: "US",
896+
currency_code: "usd",
897+
})
898+
899+
let cart = await cartModuleService.createCarts({
900+
currency_code: "usd",
901+
sales_channel_id: salesChannel.id,
902+
region_id: region.id,
903+
})
904+
905+
await remoteLink.create([
906+
{
907+
[Modules.SALES_CHANNEL]: {
908+
sales_channel_id: salesChannel.id,
909+
},
910+
[Modules.STOCK_LOCATION]: {
911+
stock_location_id: location.id,
912+
},
913+
},
914+
])
915+
916+
cart = await cartModuleService.retrieveCart(cart.id, {
917+
select: ["id", "region_id", "currency_code", "sales_channel_id"],
918+
})
919+
920+
await addToCartWorkflow(appContainer).run({
921+
input: {
922+
items: [
923+
{
924+
title: "Test item",
925+
subtitle: "Test subtitle",
926+
thumbnail: "some-url",
927+
requires_shipping: false,
928+
is_discountable: false,
929+
is_tax_inclusive: false,
930+
unit_price: 3000,
931+
metadata: {
932+
foo: "bar",
933+
},
934+
quantity: 1,
935+
},
936+
{
937+
title: "zero price item",
938+
subtitle: "zero price item",
939+
thumbnail: "some-url",
940+
requires_shipping: false,
941+
is_discountable: false,
942+
is_tax_inclusive: false,
943+
unit_price: 0,
944+
quantity: 1,
945+
},
946+
],
947+
cart_id: cart.id,
948+
},
949+
})
950+
951+
cart = await cartModuleService.retrieveCart(cart.id, {
952+
relations: ["items"],
953+
})
954+
955+
await createPaymentCollectionForCartWorkflow(appContainer).run({
956+
input: {
957+
cart_id: cart.id,
958+
},
959+
})
960+
961+
const [paymentCollection] =
962+
await paymentModule.listPaymentCollections({})
963+
964+
await createPaymentSessionsWorkflow(appContainer).run({
965+
input: {
966+
payment_collection_id: paymentCollection.id,
967+
provider_id: "pp_system_default",
968+
context: {},
969+
data: {},
970+
},
971+
})
972+
973+
let grouppedEventBefore: Message[] = []
974+
let eventGroupId!: string
975+
976+
/**
977+
* Register order.placed listener to trigger the event group
978+
* registration and be able to check the event group during
979+
* the workflow execution against it after compensation
980+
*/
981+
982+
eventBus.subscribe("order.placed", async () => {
983+
// noop
984+
})
985+
986+
const workflow = completeCartWorkflow(appContainer)
987+
988+
workflow.addAction("throw", {
989+
invoke: async function failStep({ context }) {
990+
eventGroupId = context!.eventGroupId!
991+
grouppedEventBefore = (
992+
(eventBus as any).groupedEventsMap_ as Map<string, any>
993+
).get(context!.eventGroupId!)
994+
995+
throw new Error(
996+
`Failed to do something before ending complete cart workflow`
997+
)
998+
},
999+
})
1000+
1001+
const { errors } = await workflow.run({
1002+
input: {
1003+
id: cart.id,
1004+
},
1005+
throwOnError: false,
1006+
})
1007+
1008+
const grouppedEventAfter =
1009+
((eventBus as any).groupedEventsMap_ as Map<string, any>).get(
1010+
eventGroupId
1011+
) ?? []
1012+
1013+
expect(grouppedEventBefore).toHaveLength(1)
1014+
expect(grouppedEventAfter).toHaveLength(0) // events have been compensated
1015+
1016+
expect(errors[0].error.message).toBe(
1017+
"Failed to do something before ending complete cart workflow"
1018+
)
1019+
})
8791020
})
8801021
})
8811022
},

packages/core/core-flows/src/common/steps/emit-event.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
import { Modules } from "@medusajs/framework/utils"
66
import {
77
StepExecutionContext,
8+
StepResponse,
89
createStep,
910
} from "@medusajs/framework/workflows-sdk"
1011

@@ -85,6 +86,25 @@ export const emitEventStep = createStep(
8586
}
8687

8788
await eventBus.emit(message)
89+
90+
return new StepResponse({
91+
eventGroupId: context.eventGroupId,
92+
eventName: input.eventName,
93+
})
8894
},
89-
async (data: void) => {}
95+
async (data, context) => {
96+
if (!data || !data?.eventGroupId) {
97+
return
98+
}
99+
100+
const { container } = context
101+
102+
const eventBus: IEventBusModuleService = container.resolve(
103+
Modules.EVENT_BUS
104+
)
105+
106+
await eventBus.clearGroupedEvents(data!.eventGroupId, {
107+
eventNames: [data!.eventName],
108+
})
109+
}
90110
)

packages/core/types/src/event-bus/event-bus-module.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ import { Message, Subscriber, SubscriberContext } from "./common"
33
export interface IEventBusModuleService {
44
/**
55
* This method emits one or more events. Subscribers listening to the event(s) are executed asynchronously.
6-
*
6+
*
77
* @param data - The details of the events to emit.
88
* @param options - Additional options for the event.
9-
*
9+
*
1010
* @example
11-
* await eventModuleService.emit({
12-
* name: "user.created",
13-
* data: {
11+
* await eventModuleService.emit({
12+
* name: "user.created",
13+
* data: {
1414
* user_id: "user_123"
1515
* }
1616
* })
@@ -22,12 +22,12 @@ export interface IEventBusModuleService {
2222

2323
/**
2424
* This method adds a subscriber to an event. It's mainly used internally to register subscribers.
25-
*
25+
*
2626
* @param eventName - The name of the event to subscribe to.
2727
* @param subscriber - The subscriber function to execute when the event is emitted.
2828
* @param context - The context of the subscriber.
2929
* @returns The instance of the Event Module
30-
*
30+
*
3131
* @example
3232
* eventModuleService.subscribe("user.created", async (data) => {
3333
* console.log("User created", data)
@@ -41,12 +41,12 @@ export interface IEventBusModuleService {
4141

4242
/**
4343
* This method removes a subscriber from an event. It's mainly used internally to unregister subscribers.
44-
*
44+
*
4545
* @param eventName - The name of the event to unsubscribe from.
4646
* @param subscriber - The subscriber function to remove.
4747
* @param context - The context of the subscriber.
4848
* @returns The instance of the Event Module
49-
*
49+
*
5050
* @example
5151
* eventModuleService.unsubscribe("user.created", async (data) => {
5252
* console.log("User created", data)
@@ -61,21 +61,29 @@ export interface IEventBusModuleService {
6161
/**
6262
* This method emits all events in the specified group. Grouped events are useful when you have distributed transactions
6363
* where you need to explicitly group, release and clear events upon lifecycle events of a transaction.
64-
*
64+
*
6565
* @param eventGroupId - The ID of the event group.
66-
*
66+
*
6767
* @example
6868
* await eventModuleService.releaseGroupedEvents("group_123")
6969
*/
7070
releaseGroupedEvents(eventGroupId: string): Promise<void>
7171
/**
7272
* This method removes all events in the specified group. Grouped events are useful when you have distributed transactions
7373
* where you need to explicitly group, release and clear events upon lifecycle events of a transaction.
74-
*
74+
*
7575
* @param eventGroupId - The ID of the event group.
76-
*
76+
* @param options - Additional options for the event.
77+
* @param options.eventNames - The names of the events to clear. If not provided, The group will
78+
* be entirely cleared.
79+
*
7780
* @example
7881
* await eventModuleService.clearGroupedEvents("group_123")
7982
*/
80-
clearGroupedEvents(eventGroupId: string): Promise<void>
83+
clearGroupedEvents(
84+
eventGroupId: string,
85+
options?: {
86+
eventNames?: string[]
87+
}
88+
): Promise<void>
8189
}

packages/core/utils/src/event-bus/index.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,16 @@ export abstract class AbstractEventBusModuleService
3838
*/
3939
// Given a eventGroupId, all the grouped events will be released
4040
abstract releaseGroupedEvents(eventGroupId: string): Promise<void>
41-
// Given a eventGroupId, all the grouped events will be cleared
42-
abstract clearGroupedEvents(eventGroupId: string): Promise<void>
41+
42+
// Given a eventGroupId, all the grouped events will be cleared unless eventNames are provided
43+
// If eventNames are provided, only the events that match the eventNames will be cleared from the
44+
// group
45+
abstract clearGroupedEvents(
46+
eventGroupId: string,
47+
options?: {
48+
eventNames?: string[]
49+
}
50+
): Promise<void>
4351

4452
protected storeSubscribers({
4553
event,

0 commit comments

Comments
 (0)