Skip to content

Commit f56310e

Browse files
author
Max Gruenfelder
committed
saga pattern start
1 parent 7102d6c commit f56310e

File tree

4 files changed

+194
-9
lines changed

4 files changed

+194
-9
lines changed

src/EventQueueProcessorBase.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class EventQueueProcessorBase {
4343
#currentKeepAlivePromise = Promise.resolve();
4444
#etagMap;
4545
#namespace;
46+
#nextSagaEvents;
4647

4748
constructor(context, eventType, eventSubType, config) {
4849
this.__context = context;
@@ -468,6 +469,10 @@ class EventQueueProcessorBase {
468469
return result;
469470
}, {});
470471

472+
if (!Object.values(updateData).length) {
473+
return;
474+
}
475+
471476
for (const { ids, data } of Object.values(updateData)) {
472477
if (!("status" in data)) {
473478
// TODO: Can this still happen?
@@ -522,6 +527,11 @@ class EventQueueProcessorBase {
522527
.where("ID IN", ids)
523528
);
524529
}
530+
531+
if (this.#nextSagaEvents?.length) {
532+
await tx.run(INSERT.into(this.#config.tableNameEventQueue).entries(this.#nextSagaEvents));
533+
this.#nextSagaEvents = [];
534+
}
525535
});
526536
}
527537

@@ -1390,6 +1400,10 @@ class EventQueueProcessorBase {
13901400
get allowedFieldsEventHandler() {
13911401
return ALLOWED_FIELDS_FOR_UPDATE;
13921402
}
1403+
1404+
set nextSagaEvents(value) {
1405+
this.#nextSagaEvents = value;
1406+
}
13931407
}
13941408

13951409
module.exports = EventQueueProcessorBase;

src/outbox/EventQueueGenericOutboxHandler.js

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const EVENT_QUEUE_ACTIONS = {
1414
EXCEEDED: "eventQueueRetriesExceeded",
1515
CLUSTER: "eventQueueCluster",
1616
CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload",
17+
SAGA_SUCCESS: "#succeeded",
18+
SAGA_FAILED: "#failed",
1719
};
1820

1921
class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
@@ -260,7 +262,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
260262
async checkEventAndGeneratePayload(queueEntry) {
261263
const payload = await super.checkEventAndGeneratePayload(queueEntry);
262264
const { event } = payload;
263-
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event);
265+
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event });
264266
if (!handlerName) {
265267
return payload;
266268
}
@@ -281,7 +283,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
281283

282284
async hookForExceededEvents(exceededEvent) {
283285
const { event } = exceededEvent.payload;
284-
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.EXCEEDED, event);
286+
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.EXCEEDED, event });
285287
if (!handlerName) {
286288
return await super.hookForExceededEvents(exceededEvent);
287289
}
@@ -299,14 +301,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
299301
return await super.beforeProcessingEvents();
300302
}
301303

302-
#checkHandlerExists(eventQueueFn, event) {
303-
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
304+
#checkHandlerExists({ eventQueueFn, event, saga } = {}) {
305+
if (eventQueueFn) {
306+
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
307+
if (specificHandler) {
308+
return specificHandler;
309+
}
310+
311+
const genericHandler = this.__onHandlers[eventQueueFn];
312+
return genericHandler ?? null;
313+
}
314+
315+
const specificHandler = this.__onHandlers[[event, saga].join("/")];
304316
if (specificHandler) {
305317
return specificHandler;
306318
}
307319

308-
const genericHandler = this.__onHandlers[eventQueueFn];
309-
return genericHandler ?? null;
320+
return this.__onHandlers[saga];
310321
}
311322

312323
async processPeriodicEvent(processContext, key, queueEntry) {
@@ -319,7 +330,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
319330
#buildDispatchData(context, payload, { key, queueEntries } = {}) {
320331
const { useEventQueueUser } = this.eventConfig;
321332
const userId = useEventQueueUser ? config.userId : payload.contextUser;
322-
const reg = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload);
333+
const reg = payload.das_fromSend ? new cds.Request(payload) : new cds.Event(payload);
323334
const invocationFn = payload._fromSend ? "send" : "emit";
324335
delete reg._fromSend;
325336
delete reg.contextUser;
@@ -344,7 +355,9 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
344355
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
345356
await this.#setContextUser(processContext, userId, reg);
346357
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
347-
return this.#determineResultStatus(result, queueEntries);
358+
const statusTuple = this.#determineResultStatus(result, queueEntries);
359+
await this.#publishFollowupEvents(processContext, reg, statusTuple);
360+
return statusTuple;
348361
} catch (err) {
349362
this.logger.error("error processing outboxed service call", err, {
350363
serviceName: this.eventSubType,
@@ -359,11 +372,41 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
359372
}
360373
}
361374

375+
async #publishFollowupEvents(processContext, req, statusTuple) {
376+
const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS });
377+
const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED });
378+
379+
if (!succeeded && !failed) {
380+
return;
381+
}
382+
383+
// NOTE: required for #failed because tx is rolledback and new events would not be commmited!
384+
const tx = cds.tx(processContext);
385+
const nextEvents = tx._eventQueue?.events;
386+
387+
if (nextEvents?.length) {
388+
tx._eventQueue.events = [];
389+
}
390+
391+
for (const [id, result] of statusTuple) {
392+
if (succeeded && result.status === EventProcessingStatus.Done) {
393+
(await this.__srv.tx(processContext)).send(succeeded, result.nextData ?? req.data);
394+
}
395+
396+
if (failed && result.status === EventProcessingStatus.Error) {
397+
(await this.__srv.tx(processContext)).send(failed, result.nextData ?? req.data);
398+
}
399+
}
400+
401+
this.nextSagaEvents = tx._eventQueue.events;
402+
tx._eventQueue.events = nextEvents ?? [];
403+
}
404+
362405
#determineResultStatus(result, queueEntries) {
363406
const validStatusValues = Object.values(EventProcessingStatus);
364407
const validStatus = validStatusValues.includes(result);
365408
if (validStatus) {
366-
return queueEntries.map((queueEntry) => [queueEntry.ID, result]);
409+
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: result }]);
367410
}
368411

369412
if (result instanceof Object && !Array.isArray(result)) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"use strict";
2+
3+
const cds = require("@sap/cds");
4+
5+
class StandardService extends cds.Service {
6+
async init() {
7+
await super.init();
8+
this.on("saga", (req) => {
9+
cds.log(this.name).info(req.event, {
10+
data: req.data,
11+
user: req.user.id,
12+
error: req.data.error,
13+
});
14+
15+
return {
16+
status: req.data.status ?? 2,
17+
...(req.data.nextData && { nextData: req.data.nextData }),
18+
...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }),
19+
};
20+
});
21+
22+
this.on("saga/#succeeded", (req) => {
23+
cds.log(this.name).info(req.event, {
24+
data: req.data,
25+
user: req.user.id,
26+
error: req.data.error,
27+
});
28+
29+
return {
30+
status: req.data.status ?? 2,
31+
...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }),
32+
};
33+
});
34+
35+
this.on("saga/#failed", (req) => {
36+
cds.log(this.name).info(req.event, {
37+
data: req.data,
38+
user: req.user.id,
39+
error: req.data.error,
40+
});
41+
42+
return {
43+
status: 2,
44+
...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }),
45+
};
46+
});
47+
}
48+
}
49+
50+
module.exports = StandardService;

test/eventQueueOutbox.test.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,13 @@ cds.env.requires.Namespace = {
162162
},
163163
};
164164

165+
cds.env.requires.Saga = {
166+
impl: path.join(basePath, "srv/service/saga-service.js"),
167+
queued: {
168+
kind: "persistent-queue",
169+
},
170+
};
171+
165172
cds.env.requires["sapafcsdk.scheduling.ProviderService"] = {
166173
impl: path.join(basePath, "srv/service/standard-service.js"),
167174
outbox: {
@@ -2470,6 +2477,77 @@ describe("event-queue outbox", () => {
24702477
expect(loggerMock.callsLengths().error).toEqual(0);
24712478
});
24722479
});
2480+
2481+
describe("saga pattern", () => {
2482+
describe("event specific handlers", () => {
2483+
it("if succeeded handler exists and event is green, trigger next event", async () => {
2484+
const service = await cds.connect.to("Saga");
2485+
await service.send("saga", {});
2486+
await commitAndOpenNew();
2487+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2488+
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
2489+
expectedLength: 2,
2490+
additionalColumns: ["payload"],
2491+
});
2492+
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
2493+
expect(done.status).toEqual(EventProcessingStatus.Done);
2494+
2495+
expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" });
2496+
expect(next.status).toEqual(EventProcessingStatus.Done);
2497+
expect(loggerMock.callsLengths().error).toEqual(0);
2498+
});
2499+
2500+
it("if failed handler exists and event is red, trigger next event", async () => {
2501+
const service = await cds.connect.to("Saga");
2502+
await service.send("saga", { status: EventProcessingStatus.Error });
2503+
await commitAndOpenNew();
2504+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2505+
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
2506+
expectedLength: 2,
2507+
additionalColumns: ["payload", "lastAttemptTimestamp"],
2508+
});
2509+
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
2510+
expect(done.status).toEqual(EventProcessingStatus.Error);
2511+
2512+
expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#failed" });
2513+
expect(next.status).toEqual(EventProcessingStatus.Done);
2514+
expect(loggerMock.callsLengths().error).toEqual(0);
2515+
});
2516+
2517+
it.skip("how to deal with specific event configuration srv.actionName", async () => {
2518+
const service = await cds.connect.to("Saga");
2519+
await service.send("saga");
2520+
await commitAndOpenNew();
2521+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2522+
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
2523+
expectedLength: 2,
2524+
additionalColumns: ["payload", "status"],
2525+
});
2526+
expect(JSON.parse(done.payload)).toMatchObject({});
2527+
expect(JSON.parse(done.payload)).toMatchObject({});
2528+
expect(loggerMock.callsLengths().error).toEqual(0);
2529+
});
2530+
});
2531+
2532+
describe("provide next data", () => {
2533+
it("failed handler with next data", async () => {
2534+
const service = await cds.connect.to("Saga");
2535+
await service.send("saga", { status: EventProcessingStatus.Error, nextData: { newData: "dummyData" } });
2536+
await commitAndOpenNew();
2537+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2538+
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
2539+
expectedLength: 2,
2540+
additionalColumns: ["payload"],
2541+
});
2542+
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
2543+
expect(done.status).toEqual(EventProcessingStatus.Error);
2544+
2545+
expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#failed", data: { newData: "dummyData" } });
2546+
expect(next.status).toEqual(EventProcessingStatus.Done);
2547+
expect(loggerMock.callsLengths().error).toEqual(0);
2548+
});
2549+
});
2550+
});
24732551
});
24742552

24752553
const commitAndOpenNew = async () => {

0 commit comments

Comments
 (0)