Skip to content

Commit 7baad75

Browse files
soccermaxMax Gruenfelder
andauthored
saga pattern (#435)
* saga pattern start * wip * wip * wip * add tests for general * remove comment * beta 0 --------- Co-authored-by: Max Gruenfelder <maximilian.gruenfelder@scheer-group.com>
1 parent 7102d6c commit 7baad75

File tree

8 files changed

+449
-27
lines changed

8 files changed

+449
-27
lines changed

package-lock.json

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/event-queue",
3-
"version": "2.0.4",
3+
"version": "2.1.0-beta.0",
44
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
55
"main": "src/index.js",
66
"types": "src/index.d.ts",

src/EventQueueProcessorBase.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const TRIES_FOR_EXCEEDED_EVENTS = 3;
2525
const EVENT_START_AFTER_HEADROOM = 3 * 1000;
2626
const SUFFIX_PERIODIC = "_PERIODIC";
2727

28-
const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error"];
28+
const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error", "nextData"];
2929

3030
class EventQueueProcessorBase {
3131
#eventsWithExceededTries = [];
@@ -43,6 +43,7 @@ class EventQueueProcessorBase {
4343
#currentKeepAlivePromise = Promise.resolve();
4444
#etagMap;
4545
#namespace;
46+
#nextSagaEvents;
4647

4748
constructor(context, eventType, eventSubType, config) {
4849
this.__context = context;
@@ -468,6 +469,10 @@ class EventQueueProcessorBase {
468469
return result;
469470
}, {});
470471

472+
if (!Object.values(updateData).length) {
473+
return;
474+
}
475+
471476
for (const { ids, data } of Object.values(updateData)) {
472477
if (!("status" in data)) {
473478
// TODO: Can this still happen?
@@ -522,6 +527,11 @@ class EventQueueProcessorBase {
522527
.where("ID IN", ids)
523528
);
524529
}
530+
531+
if (this.#nextSagaEvents?.length) {
532+
await tx.run(INSERT.into(this.#config.tableNameEventQueue).entries(this.#nextSagaEvents));
533+
this.#nextSagaEvents = [];
534+
}
525535
});
526536
}
527537

@@ -1390,6 +1400,10 @@ class EventQueueProcessorBase {
13901400
get allowedFieldsEventHandler() {
13911401
return ALLOWED_FIELDS_FOR_UPDATE;
13921402
}
1403+
1404+
set nextSagaEvents(value) {
1405+
this.#nextSagaEvents = value;
1406+
}
13931407
}
13941408

13951409
module.exports = EventQueueProcessorBase;

src/config.js

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const DEFAULT_RETRY_AFTER = 5 * 60 * 1000;
2828
const PRIORITIES = Object.values(Priorities);
2929
const UTC_DEFAULT = false;
3030
const USE_CRON_TZ_DEFAULT = true;
31+
const SAGA_SUCCESS = "#succeeded";
32+
const SAGA_FAILED = "#failed";
3133

3234
const BASE_TABLES = {
3335
EVENT: "sap.eventqueue.Event",
@@ -384,6 +386,20 @@ class Config {
384386
result.adHoc
385387
);
386388
result.adHoc[key] = specificEventConfig;
389+
const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/");
390+
if (config.events[sagaSuccessKey]) {
391+
const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
392+
srvConfig,
393+
name,
394+
fnName,
395+
result.adHoc
396+
);
397+
result.adHoc[sagaKey] = sagaSpecificEventConfig;
398+
} else {
399+
const sagaConfig = { ...specificEventConfig };
400+
sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/");
401+
result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig;
402+
}
387403
}
388404
}
389405
return result;
@@ -406,11 +422,24 @@ class Config {
406422
getCdsOutboxEventSpecificConfig(serviceName, action) {
407423
const srv = cds.env.requires[serviceName];
408424
const config = srv?.outbox ?? srv?.queued;
409-
if (config?.events?.[action]) {
425+
const specificConfig = config?.events?.[action];
426+
427+
if (specificConfig) {
410428
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[action]);
411-
} else {
429+
}
430+
431+
if (!action) {
412432
return null;
413433
}
434+
435+
const [withoutSaga, sagaSuffix] = action.split("/");
436+
if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) {
437+
if (config?.events?.[withoutSaga]) {
438+
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]);
439+
}
440+
}
441+
442+
return null;
414443
}
415444

416445
#mapEnvEvents(events) {

src/outbox/EventQueueGenericOutboxHandler.js

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const EVENT_QUEUE_ACTIONS = {
1414
EXCEEDED: "eventQueueRetriesExceeded",
1515
CLUSTER: "eventQueueCluster",
1616
CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload",
17+
SAGA_SUCCESS: "#succeeded",
18+
SAGA_FAILED: "#failed",
1719
};
1820

1921
class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
@@ -260,7 +262,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
260262
async checkEventAndGeneratePayload(queueEntry) {
261263
const payload = await super.checkEventAndGeneratePayload(queueEntry);
262264
const { event } = payload;
263-
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event);
265+
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event });
264266
if (!handlerName) {
265267
return payload;
266268
}
@@ -281,7 +283,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
281283

282284
async hookForExceededEvents(exceededEvent) {
283285
const { event } = exceededEvent.payload;
284-
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.EXCEEDED, event);
286+
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.EXCEEDED, event });
285287
if (!handlerName) {
286288
return await super.hookForExceededEvents(exceededEvent);
287289
}
@@ -299,14 +301,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
299301
return await super.beforeProcessingEvents();
300302
}
301303

302-
#checkHandlerExists(eventQueueFn, event) {
303-
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
304+
#checkHandlerExists({ eventQueueFn, event, saga } = {}) {
305+
if (eventQueueFn) {
306+
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
307+
if (specificHandler) {
308+
return specificHandler;
309+
}
310+
311+
const genericHandler = this.__onHandlers[eventQueueFn];
312+
return genericHandler ?? null;
313+
}
314+
315+
const specificHandler = this.__onHandlers[[event, saga].join("/")];
304316
if (specificHandler) {
305317
return specificHandler;
306318
}
307319

308-
const genericHandler = this.__onHandlers[eventQueueFn];
309-
return genericHandler ?? null;
320+
return this.__onHandlers[saga];
310321
}
311322

312323
async processPeriodicEvent(processContext, key, queueEntry) {
@@ -344,7 +355,9 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
344355
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
345356
await this.#setContextUser(processContext, userId, reg);
346357
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
347-
return this.#determineResultStatus(result, queueEntries);
358+
const statusTuple = this.#determineResultStatus(result, queueEntries);
359+
await this.#publishFollowupEvents(processContext, reg, statusTuple);
360+
return statusTuple;
348361
} catch (err) {
349362
this.logger.error("error processing outboxed service call", err, {
350363
serviceName: this.eventSubType,
@@ -359,11 +372,51 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
359372
}
360373
}
361374

375+
async #publishFollowupEvents(processContext, req, statusTuple) {
376+
const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS });
377+
const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED });
378+
379+
if (!succeeded && !failed) {
380+
return;
381+
}
382+
383+
if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
384+
return;
385+
}
386+
387+
// NOTE: required for #failed because tx is rolledback and new events would not be commmited!
388+
const tx = cds.tx(processContext);
389+
const nextEvents = tx._eventQueue?.events;
390+
391+
if (nextEvents?.length) {
392+
tx._eventQueue.events = [];
393+
}
394+
395+
for (const [, result] of statusTuple) {
396+
if (succeeded && result.status === EventProcessingStatus.Done) {
397+
await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data);
398+
}
399+
400+
if (failed && result.status === EventProcessingStatus.Error) {
401+
await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data);
402+
}
403+
404+
delete result.nextData;
405+
}
406+
407+
if (config.insertEventsBeforeCommit) {
408+
this.nextSagaEvents = tx._eventQueue.events;
409+
} else {
410+
this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed);
411+
}
412+
tx._eventQueue.events = nextEvents ?? [];
413+
}
414+
362415
#determineResultStatus(result, queueEntries) {
363416
const validStatusValues = Object.values(EventProcessingStatus);
364417
const validStatus = validStatusValues.includes(result);
365418
if (validStatus) {
366-
return queueEntries.map((queueEntry) => [queueEntry.ID, result]);
419+
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: result }]);
367420
}
368421

369422
if (result instanceof Object && !Array.isArray(result)) {
@@ -375,7 +428,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
375428
}
376429

377430
if (!Array.isArray(result)) {
378-
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
431+
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
379432
}
380433

381434
const [firstEntry] = result;
@@ -430,7 +483,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
430483
if (valid) {
431484
return result;
432485
} else {
433-
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
486+
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
434487
}
435488
}
436489
}

src/publishEvent.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ const publishEvent = async (
7676
event.namespace = config.namespace;
7777
}
7878
}
79+
_addEventsToContext(tx, events);
7980
if (config.insertEventsBeforeCommit && !skipInsertEventsBeforeCommit) {
80-
_registerHandlerAndAddEvents(tx, events, skipBroadcast);
81+
_registerHandler(tx, skipBroadcast);
8182
} else {
8283
let result;
8384
tx._skipEventQueueBroadcast = skipBroadcast;
@@ -87,10 +88,7 @@ const publishEvent = async (
8788
}
8889
};
8990

90-
const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
91-
tx._eventQueue ??= { events: [], handlerRegistered: false };
92-
tx._eventQueue.events = tx._eventQueue.events.concat(events);
93-
91+
const _registerHandler = (tx, skipBroadcast) => {
9492
if (tx._eventQueue.handlerRegistered) {
9593
return;
9694
}
@@ -106,6 +104,11 @@ const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
106104
});
107105
};
108106

107+
const _addEventsToContext = (tx, events) => {
108+
tx._eventQueue ??= { events: [], handlerRegistered: false };
109+
tx._eventQueue.events = tx._eventQueue.events.concat(events);
110+
};
111+
109112
module.exports = {
110113
publishEvent,
111114
};

0 commit comments

Comments
 (0)