Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cap-js-community/event-queue",
"version": "2.0.4",
"version": "2.1.0-beta.0",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
"main": "src/index.js",
"types": "src/index.d.ts",
Expand Down
16 changes: 15 additions & 1 deletion src/EventQueueProcessorBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const TRIES_FOR_EXCEEDED_EVENTS = 3;
const EVENT_START_AFTER_HEADROOM = 3 * 1000;
const SUFFIX_PERIODIC = "_PERIODIC";

const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error"];
const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error", "nextData"];

class EventQueueProcessorBase {
#eventsWithExceededTries = [];
Expand All @@ -43,6 +43,7 @@ class EventQueueProcessorBase {
#currentKeepAlivePromise = Promise.resolve();
#etagMap;
#namespace;
#nextSagaEvents;

constructor(context, eventType, eventSubType, config) {
this.__context = context;
Expand Down Expand Up @@ -468,6 +469,10 @@ class EventQueueProcessorBase {
return result;
}, {});

if (!Object.values(updateData).length) {
return;
}

for (const { ids, data } of Object.values(updateData)) {
if (!("status" in data)) {
// TODO: Can this still happen?
Expand Down Expand Up @@ -522,6 +527,11 @@ class EventQueueProcessorBase {
.where("ID IN", ids)
);
}

if (this.#nextSagaEvents?.length) {
await tx.run(INSERT.into(this.#config.tableNameEventQueue).entries(this.#nextSagaEvents));
this.#nextSagaEvents = [];
}
});
}

Expand Down Expand Up @@ -1390,6 +1400,10 @@ class EventQueueProcessorBase {
get allowedFieldsEventHandler() {
return ALLOWED_FIELDS_FOR_UPDATE;
}

set nextSagaEvents(value) {
this.#nextSagaEvents = value;
}
}

module.exports = EventQueueProcessorBase;
33 changes: 31 additions & 2 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const DEFAULT_RETRY_AFTER = 5 * 60 * 1000;
const PRIORITIES = Object.values(Priorities);
const UTC_DEFAULT = false;
const USE_CRON_TZ_DEFAULT = true;
const SAGA_SUCCESS = "#succeeded";
const SAGA_FAILED = "#failed";

const BASE_TABLES = {
EVENT: "sap.eventqueue.Event",
Expand Down Expand Up @@ -384,6 +386,20 @@ class Config {
result.adHoc
);
result.adHoc[key] = specificEventConfig;
const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/");
if (config.events[sagaSuccessKey]) {
const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
srvConfig,
name,
fnName,
result.adHoc
);
result.adHoc[sagaKey] = sagaSpecificEventConfig;
} else {
const sagaConfig = { ...specificEventConfig };
sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/");
result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig;
}
}
}
return result;
Expand All @@ -406,11 +422,24 @@ class Config {
getCdsOutboxEventSpecificConfig(serviceName, action) {
const srv = cds.env.requires[serviceName];
const config = srv?.outbox ?? srv?.queued;
if (config?.events?.[action]) {
const specificConfig = config?.events?.[action];

if (specificConfig) {
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[action]);
} else {
}

if (!action) {
return null;
}

const [withoutSaga, sagaSuffix] = action.split("/");
if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) {
if (config?.events?.[withoutSaga]) {
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]);
}
}

return null;
}

#mapEnvEvents(events) {
Expand Down
73 changes: 63 additions & 10 deletions src/outbox/EventQueueGenericOutboxHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const EVENT_QUEUE_ACTIONS = {
EXCEEDED: "eventQueueRetriesExceeded",
CLUSTER: "eventQueueCluster",
CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload",
SAGA_SUCCESS: "#succeeded",
SAGA_FAILED: "#failed",
};

class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
Expand Down Expand Up @@ -260,7 +262,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
async checkEventAndGeneratePayload(queueEntry) {
const payload = await super.checkEventAndGeneratePayload(queueEntry);
const { event } = payload;
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event);
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event });
if (!handlerName) {
return payload;
}
Expand All @@ -281,7 +283,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {

async hookForExceededEvents(exceededEvent) {
const { event } = exceededEvent.payload;
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.EXCEEDED, event);
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.EXCEEDED, event });
if (!handlerName) {
return await super.hookForExceededEvents(exceededEvent);
}
Expand All @@ -299,14 +301,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
return await super.beforeProcessingEvents();
}

#checkHandlerExists(eventQueueFn, event) {
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
#checkHandlerExists({ eventQueueFn, event, saga } = {}) {
if (eventQueueFn) {
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
if (specificHandler) {
return specificHandler;
}

const genericHandler = this.__onHandlers[eventQueueFn];
return genericHandler ?? null;
}

const specificHandler = this.__onHandlers[[event, saga].join("/")];
if (specificHandler) {
return specificHandler;
}

const genericHandler = this.__onHandlers[eventQueueFn];
return genericHandler ?? null;
return this.__onHandlers[saga];
}

async processPeriodicEvent(processContext, key, queueEntry) {
Expand Down Expand Up @@ -344,7 +355,9 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
await this.#setContextUser(processContext, userId, reg);
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
return this.#determineResultStatus(result, queueEntries);
const statusTuple = this.#determineResultStatus(result, queueEntries);
await this.#publishFollowupEvents(processContext, reg, statusTuple);
return statusTuple;
} catch (err) {
this.logger.error("error processing outboxed service call", err, {
serviceName: this.eventSubType,
Expand All @@ -359,11 +372,51 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}
}

async #publishFollowupEvents(processContext, req, statusTuple) {
const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS });
const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED });

if (!succeeded && !failed) {
return;
}

if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
return;
}

// NOTE: required for #failed because tx is rolledback and new events would not be commmited!
const tx = cds.tx(processContext);
const nextEvents = tx._eventQueue?.events;

if (nextEvents?.length) {
tx._eventQueue.events = [];
}

for (const [, result] of statusTuple) {
if (succeeded && result.status === EventProcessingStatus.Done) {
await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data);
}

if (failed && result.status === EventProcessingStatus.Error) {
await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data);
}

delete result.nextData;
}

if (config.insertEventsBeforeCommit) {
this.nextSagaEvents = tx._eventQueue.events;
} else {
this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed);
}
tx._eventQueue.events = nextEvents ?? [];
}

#determineResultStatus(result, queueEntries) {
const validStatusValues = Object.values(EventProcessingStatus);
const validStatus = validStatusValues.includes(result);
if (validStatus) {
return queueEntries.map((queueEntry) => [queueEntry.ID, result]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: result }]);
}

if (result instanceof Object && !Array.isArray(result)) {
Expand All @@ -375,7 +428,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}

if (!Array.isArray(result)) {
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
}

const [firstEntry] = result;
Expand Down Expand Up @@ -430,7 +483,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
if (valid) {
return result;
} else {
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/publishEvent.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ const publishEvent = async (
event.namespace = config.namespace;
}
}
_addEventsToContext(tx, events);
if (config.insertEventsBeforeCommit && !skipInsertEventsBeforeCommit) {
_registerHandlerAndAddEvents(tx, events, skipBroadcast);
_registerHandler(tx, skipBroadcast);
} else {
let result;
tx._skipEventQueueBroadcast = skipBroadcast;
Expand All @@ -87,10 +88,7 @@ const publishEvent = async (
}
};

const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
tx._eventQueue ??= { events: [], handlerRegistered: false };
tx._eventQueue.events = tx._eventQueue.events.concat(events);

const _registerHandler = (tx, skipBroadcast) => {
if (tx._eventQueue.handlerRegistered) {
return;
}
Expand All @@ -106,6 +104,11 @@ const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
});
};

const _addEventsToContext = (tx, events) => {
tx._eventQueue ??= { events: [], handlerRegistered: false };
tx._eventQueue.events = tx._eventQueue.events.concat(events);
};

module.exports = {
publishEvent,
};
Loading