Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/EventQueueProcessorBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const TRIES_FOR_EXCEEDED_EVENTS = 3;
const EVENT_START_AFTER_HEADROOM = 3 * 1000;
const SUFFIX_PERIODIC = "_PERIODIC";

const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error"];
const ALLOWED_FIELDS_FOR_UPDATE = ["status", "startAfter", "error", "nextData"];

class EventQueueProcessorBase {
#eventsWithExceededTries = [];
Expand All @@ -43,6 +43,7 @@ class EventQueueProcessorBase {
#currentKeepAlivePromise = Promise.resolve();
#etagMap;
#namespace;
#nextSagaEvents;

constructor(context, eventType, eventSubType, config) {
this.__context = context;
Expand Down Expand Up @@ -468,6 +469,10 @@ class EventQueueProcessorBase {
return result;
}, {});

if (!Object.values(updateData).length) {
return;
}

for (const { ids, data } of Object.values(updateData)) {
if (!("status" in data)) {
// TODO: Can this still happen?
Expand Down Expand Up @@ -522,6 +527,11 @@ class EventQueueProcessorBase {
.where("ID IN", ids)
);
}

if (this.#nextSagaEvents?.length) {
await tx.run(INSERT.into(this.#config.tableNameEventQueue).entries(this.#nextSagaEvents));
this.#nextSagaEvents = [];
}
});
}

Expand Down Expand Up @@ -1390,6 +1400,10 @@ class EventQueueProcessorBase {
get allowedFieldsEventHandler() {
return ALLOWED_FIELDS_FOR_UPDATE;
}

set nextSagaEvents(value) {
this.#nextSagaEvents = value;
}
}

module.exports = EventQueueProcessorBase;
33 changes: 31 additions & 2 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const DEFAULT_RETRY_AFTER = 5 * 60 * 1000;
const PRIORITIES = Object.values(Priorities);
const UTC_DEFAULT = false;
const USE_CRON_TZ_DEFAULT = true;
const SAGA_SUCCESS = "#succeeded";
const SAGA_FAILED = "#failed";

const BASE_TABLES = {
EVENT: "sap.eventqueue.Event",
Expand Down Expand Up @@ -384,6 +386,20 @@ class Config {
result.adHoc
);
result.adHoc[key] = specificEventConfig;
const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/");
if (config.events[sagaSuccessKey]) {
const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
srvConfig,
name,
fnName,
result.adHoc
);
result.adHoc[sagaKey] = sagaSpecificEventConfig;
} else {
const sagaConfig = { ...specificEventConfig };
sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/");
result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig;
}
}
}
return result;
Expand All @@ -406,11 +422,24 @@ class Config {
getCdsOutboxEventSpecificConfig(serviceName, action) {
const srv = cds.env.requires[serviceName];
const config = srv?.outbox ?? srv?.queued;
if (config?.events?.[action]) {
const specificConfig = config?.events?.[action];

if (specificConfig) {
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[action]);
} else {
}

if (!action) {
return null;
}

const [withoutSaga, sagaSuffix] = action.split("/");
if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) {
if (config?.events?.[withoutSaga]) {
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]);
}
}

return null;
}

#mapEnvEvents(events) {
Expand Down
76 changes: 65 additions & 11 deletions src/outbox/EventQueueGenericOutboxHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const EVENT_QUEUE_ACTIONS = {
EXCEEDED: "eventQueueRetriesExceeded",
CLUSTER: "eventQueueCluster",
CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload",
SAGA_SUCCESS: "#succeeded",
SAGA_FAILED: "#failed",
};

class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
Expand Down Expand Up @@ -260,7 +262,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
async checkEventAndGeneratePayload(queueEntry) {
const payload = await super.checkEventAndGeneratePayload(queueEntry);
const { event } = payload;
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event);
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.CHECK_AND_ADJUST, event });
if (!handlerName) {
return payload;
}
Expand All @@ -281,7 +283,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {

async hookForExceededEvents(exceededEvent) {
const { event } = exceededEvent.payload;
const handlerName = this.#checkHandlerExists(EVENT_QUEUE_ACTIONS.EXCEEDED, event);
const handlerName = this.#checkHandlerExists({ eventQueueFn: EVENT_QUEUE_ACTIONS.EXCEEDED, event });
if (!handlerName) {
return await super.hookForExceededEvents(exceededEvent);
}
Expand All @@ -299,14 +301,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
return await super.beforeProcessingEvents();
}

#checkHandlerExists(eventQueueFn, event) {
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
#checkHandlerExists({ eventQueueFn, event, saga } = {}) {
if (eventQueueFn) {
const specificHandler = this.__onHandlers[[eventQueueFn, event].join(".")];
if (specificHandler) {
return specificHandler;
}

const genericHandler = this.__onHandlers[eventQueueFn];
return genericHandler ?? null;
}

const specificHandler = this.__onHandlers[[event, saga].join("/")];
if (specificHandler) {
return specificHandler;
}

const genericHandler = this.__onHandlers[eventQueueFn];
return genericHandler ?? null;
return this.__onHandlers[saga];
}

async processPeriodicEvent(processContext, key, queueEntry) {
Expand All @@ -319,7 +330,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
#buildDispatchData(context, payload, { key, queueEntries } = {}) {
const { useEventQueueUser } = this.eventConfig;
const userId = useEventQueueUser ? config.userId : payload.contextUser;
const reg = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload);
const reg = payload.das_fromSend ? new cds.Request(payload) : new cds.Event(payload);
const invocationFn = payload._fromSend ? "send" : "emit";
delete reg._fromSend;
delete reg.contextUser;
Expand All @@ -344,7 +355,9 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
await this.#setContextUser(processContext, userId, reg);
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
return this.#determineResultStatus(result, queueEntries);
const statusTuple = this.#determineResultStatus(result, queueEntries);
await this.#publishFollowupEvents(processContext, reg, statusTuple);
return statusTuple;
} catch (err) {
this.logger.error("error processing outboxed service call", err, {
serviceName: this.eventSubType,
Expand All @@ -359,11 +372,52 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}
}

//TODO: exit if saga event
async #publishFollowupEvents(processContext, req, statusTuple) {
const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS });
const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED });

if (!succeeded && !failed) {
return;
}

if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
return;
}

// NOTE: required for #failed because tx is rolledback and new events would not be commmited!
const tx = cds.tx(processContext);
const nextEvents = tx._eventQueue?.events;

if (nextEvents?.length) {
tx._eventQueue.events = [];
}

for (const [, result] of statusTuple) {
if (succeeded && result.status === EventProcessingStatus.Done) {
await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data);
}

if (failed && result.status === EventProcessingStatus.Error) {
await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data);
}

delete result.nextData;
}

if (config.insertEventsBeforeCommit) {
this.nextSagaEvents = tx._eventQueue.events;
} else {
this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed);
}
tx._eventQueue.events = nextEvents ?? [];
}

#determineResultStatus(result, queueEntries) {
const validStatusValues = Object.values(EventProcessingStatus);
const validStatus = validStatusValues.includes(result);
if (validStatus) {
return queueEntries.map((queueEntry) => [queueEntry.ID, result]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: result }]);
}

if (result instanceof Object && !Array.isArray(result)) {
Expand All @@ -375,7 +429,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}

if (!Array.isArray(result)) {
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
}

const [firstEntry] = result;
Expand Down Expand Up @@ -430,7 +484,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
if (valid) {
return result;
} else {
return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]);
return queueEntries.map((queueEntry) => [queueEntry.ID, { status: EventProcessingStatus.Done }]);
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/publishEvent.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ const publishEvent = async (
event.namespace = config.namespace;
}
}
_addEventsToContext(tx, events);
if (config.insertEventsBeforeCommit && !skipInsertEventsBeforeCommit) {
_registerHandlerAndAddEvents(tx, events, skipBroadcast);
_registerHandler(tx, skipBroadcast);
} else {
let result;
tx._skipEventQueueBroadcast = skipBroadcast;
Expand All @@ -87,10 +88,7 @@ const publishEvent = async (
}
};

const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
tx._eventQueue ??= { events: [], handlerRegistered: false };
tx._eventQueue.events = tx._eventQueue.events.concat(events);

const _registerHandler = (tx, skipBroadcast) => {
if (tx._eventQueue.handlerRegistered) {
return;
}
Expand All @@ -106,6 +104,11 @@ const _registerHandlerAndAddEvents = (tx, events, skipBroadcast) => {
});
};

const _addEventsToContext = (tx, events) => {
tx._eventQueue ??= { events: [], handlerRegistered: false };
tx._eventQueue.events = tx._eventQueue.events.concat(events);
};

module.exports = {
publishEvent,
};
Loading