Skip to content

Commit 39b932e

Browse files
soccermaxMax Gruenfelder
andauthored
saga v2 (#436)
* saga 2 * saga v2 * remove comment --------- Co-authored-by: Max Gruenfelder <maximilian.gruenfelder@scheer-group.com>
1 parent 7baad75 commit 39b932e

File tree

6 files changed

+91
-26
lines changed

6 files changed

+91
-26
lines changed

package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/event-queue",
3-
"version": "2.1.0-beta.0",
3+
"version": "2.1.0-beta.1",
44
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
55
"main": "src/index.js",
66
"types": "src/index.d.ts",
@@ -22,8 +22,6 @@
2222
"multi-tenancy"
2323
],
2424
"scripts": {
25-
"start": "PORT=4005 cds-serve",
26-
"watch": "PORT=4005 cds watch",
2725
"test:unit": "jest --selectProjects unit",
2826
"test:integration": "jest --selectProjects integration --runInBand",
2927
"voter:test:integration": "jest --selectProjects integration",
@@ -38,13 +36,12 @@
3836
"eslint:ci": "eslint .",
3937
"prettier": "prettier --write --loglevel error .",
4038
"prettier:ci": "prettier --check .",
41-
"prepareRelease": "npm prune --production",
4239
"docs": "cd docs && bundle exec jekyll serve",
4340
"docs:install": "cd docs && npx shx rm -rf vendor Gemfile.lock && bundle install",
4441
"upgrade-lock": "npx shx rm -rf package-lock.json node_modules && npm i --package-lock"
4542
},
4643
"engines": {
47-
"node": ">=18"
44+
"node": ">=20"
4845
},
4946
"dependencies": {
5047
"@cap-js-community/common": "^0.3.4",

src/EventQueueProcessorBase.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ class EventQueueProcessorBase {
408408
UPDATE.entity(this.#config.tableNameEventQueue)
409409
.set({
410410
status: status,
411-
...(error && { error: this.#error2String(error) }),
411+
...(error && { error: this._error2String(error) }),
412412
})
413413
.where({
414414
ID: queueEntryIds,
@@ -498,7 +498,7 @@ class EventQueueProcessorBase {
498498
}
499499

500500
if (data.error) {
501-
data.error = this.#error2String(data.error);
501+
data.error = this._error2String(data.error);
502502
}
503503

504504
if (!data.startAfter && [EventProcessingStatus.Error, EventProcessingStatus.Open].includes(data.status)) {
@@ -535,7 +535,7 @@ class EventQueueProcessorBase {
535535
});
536536
}
537537

538-
#error2String(error) {
538+
_error2String(error) {
539539
return JSON.stringify(error, (_, value) => this.#errorReplacer(value));
540540
}
541541

src/outbox/EventQueueGenericOutboxHandler.js

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,10 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
312312
return genericHandler ?? null;
313313
}
314314

315+
if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) {
316+
[event] = event.split("/");
317+
}
318+
315319
const specificHandler = this.__onHandlers[[event, saga].join("/")];
316320
if (specificHandler) {
317321
return specificHandler;
@@ -351,25 +355,27 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
351355
}
352356

353357
async processEvent(processContext, key, queueEntries, payload) {
358+
let statusTuple;
359+
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
354360
try {
355-
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
356361
await this.#setContextUser(processContext, userId, reg);
357362
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
358-
const statusTuple = this.#determineResultStatus(result, queueEntries);
359-
await this.#publishFollowupEvents(processContext, reg, statusTuple);
360-
return statusTuple;
363+
statusTuple = this.#determineResultStatus(result, queueEntries);
361364
} catch (err) {
362365
this.logger.error("error processing outboxed service call", err, {
363366
serviceName: this.eventSubType,
364367
});
365-
return queueEntries.map((queueEntry) => [
368+
statusTuple = queueEntries.map((queueEntry) => [
366369
queueEntry.ID,
367370
{
368371
status: EventProcessingStatus.Error,
369372
error: err,
370373
},
371374
]);
372375
}
376+
377+
await this.#publishFollowupEvents(processContext, reg, statusTuple);
378+
return statusTuple;
373379
}
374380

375381
async #publishFollowupEvents(processContext, req, statusTuple) {
@@ -380,7 +386,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
380386
return;
381387
}
382388

383-
if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
389+
if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
384390
return;
385391
}
386392

@@ -393,23 +399,32 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
393399
}
394400

395401
for (const [, result] of statusTuple) {
396-
if (succeeded && result.status === EventProcessingStatus.Done) {
397-
await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data);
402+
const data = result.nextData ?? req.data;
403+
if (
404+
succeeded &&
405+
result.status === EventProcessingStatus.Done &&
406+
!req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)
407+
) {
408+
await this.__srv.tx(processContext).send(succeeded, data);
398409
}
399410

400411
if (failed && result.status === EventProcessingStatus.Error) {
401-
await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data);
412+
result.error && (data.error = this._error2String(result.error));
413+
await this.__srv.tx(processContext).send(failed, data);
402414
}
403415

404416
delete result.nextData;
405417
}
406418

407419
if (config.insertEventsBeforeCommit) {
408-
this.nextSagaEvents = tx._eventQueue.events;
420+
this.nextSagaEvents = tx._eventQueue?.events;
409421
} else {
410-
this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed);
422+
this.nextSagaEvents = tx._eventQueue?.events.filter((event) => JSON.parse(event.payload).event === failed);
423+
}
424+
425+
if (tx._eventQueue) {
426+
tx._eventQueue.events = nextEvents ?? [];
411427
}
412-
tx._eventQueue.events = nextEvents ?? [];
413428
}
414429

415430
#determineResultStatus(result, queueEntries) {

test/asset/outboxProject/srv/service/saga-service.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ class StandardService extends cds.Service {
1212
error: req.data.error,
1313
});
1414

15+
if (req.data.throw) {
16+
throw new Error(req.data.throw);
17+
}
18+
1519
return {
1620
status: req.data.status ?? 2,
1721
...(req.data.nextData && { nextData: req.data.nextData }),
@@ -94,7 +98,7 @@ class StandardService extends cds.Service {
9498
});
9599

96100
return {
97-
status: 2,
101+
status: req.data.status ?? 2,
98102
...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }),
99103
};
100104
});

test/eventQueueOutbox.test.js

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2508,7 +2508,10 @@ describe("event-queue outbox", () => {
25082508

25092509
it("if failed handler exists and event is red, trigger next event", async () => {
25102510
const service = await cds.connect.to("Saga");
2511-
await service.send("saga", { status: EventProcessingStatus.Error });
2511+
await service.send("saga", {
2512+
status: EventProcessingStatus.Error,
2513+
nextData: { status: EventProcessingStatus.Done },
2514+
});
25122515
await commitAndOpenNew();
25132516
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
25142517
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
@@ -2541,12 +2544,58 @@ describe("event-queue outbox", () => {
25412544
expect(loggerMock.callsLengths().error).toEqual(0);
25422545
config.insertEventsBeforeCommit = true;
25432546
});
2547+
2548+
it("exception should trigger failed and pass error", async () => {
2549+
const service = await cds.connect.to("Saga");
2550+
await service.send("saga", { throw: "error", nextData: { status: EventProcessingStatus.Done } });
2551+
await commitAndOpenNew();
2552+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2553+
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
2554+
expectedLength: 2,
2555+
additionalColumns: ["payload", "lastAttemptTimestamp"],
2556+
});
2557+
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
2558+
expect(done.status).toEqual(EventProcessingStatus.Error);
2559+
2560+
expect(JSON.parse(next.payload)).toMatchObject({
2561+
event: "saga/#failed",
2562+
data: { error: expect.stringContaining("error") },
2563+
});
2564+
expect(next.status).toEqual(EventProcessingStatus.Done);
2565+
expect(loggerMock.callsLengths().error).toEqual(1);
2566+
});
2567+
2568+
it("error in succeeded should trigger failed", async () => {
2569+
const service = await cds.connect.to("Saga");
2570+
await service.send("saga", {
2571+
status: EventProcessingStatus.Done,
2572+
nextData: { status: EventProcessingStatus.Error },
2573+
});
2574+
await commitAndOpenNew();
2575+
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
2576+
const [done, next, nextFailed] = await testHelper.selectEventQueueAndReturn(tx, {
2577+
expectedLength: 3,
2578+
additionalColumns: ["payload", "lastAttemptTimestamp"],
2579+
});
2580+
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
2581+
expect(done.status).toEqual(EventProcessingStatus.Done);
2582+
2583+
expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" });
2584+
expect(next.status).toEqual(EventProcessingStatus.Error);
2585+
2586+
expect(JSON.parse(nextFailed.payload)).toMatchObject({ event: "saga/#failed" });
2587+
expect(nextFailed.status).toEqual(EventProcessingStatus.Error);
2588+
expect(loggerMock.callsLengths().error).toEqual(0);
2589+
});
25442590
});
25452591

25462592
describe("provide next data", () => {
25472593
it("failed handler with next data", async () => {
25482594
const service = await cds.connect.to("Saga");
2549-
await service.send("saga", { status: EventProcessingStatus.Error, nextData: { newData: "dummyData" } });
2595+
await service.send("saga", {
2596+
status: EventProcessingStatus.Error,
2597+
nextData: { newData: "dummyData", status: EventProcessingStatus.Done },
2598+
});
25502599
await commitAndOpenNew();
25512600
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
25522601
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {

0 commit comments

Comments
 (0)