Skip to content

Commit ee66bea

Browse files
soccermaxMax Gruenfelder
andauthored
Propagate also on cds.context (#443)
* propagate * beta 2 --------- Co-authored-by: Max Gruenfelder <maximilian.gruenfelder@scheer-group.com>
1 parent 6e75a22 commit ee66bea

File tree

10 files changed

+173
-148
lines changed

10 files changed

+173
-148
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1010
### Added
1111

1212
- [CAP Queue] Add support for defining successor and failed events of event handlers. See documentation for how to use it.
13-
- [CAP Queue] Allow to propagate cds.context properties (e.g. features). This can be configured per event (`cds.env.requires[<SERVICE>].queued.propagatedContextProperties = ["features"]`)
13+
- [CAP Queue] Allow to propagate cds.context properties (e.g. features). This can be configured per event (`cds.env.requires[<SERVICE>].queued.propagateContextProperties = ["features"]`)
1414

1515
### Fixed
1616

package-lock.json

Lines changed: 115 additions & 98 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/event-queue",
3-
"version": "2.1.0-beta.1",
3+
"version": "2.1.0-beta.2",
44
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
55
"main": "src/index.js",
66
"types": "src/index.d.ts",

src/config.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const ALLOWED_EVENT_OPTIONS_AD_HOC = [
6464
"checkForNextChunk",
6565
"retryFailedAfter",
6666
"propagateHeaders",
67-
"propagatedContextProperties",
67+
"propagateContextProperties",
6868
"retryOpenAfter",
6969
"multiInstanceProcessing",
7070
"kind",
@@ -518,7 +518,7 @@ class Config {
518518
? Object.fromEntries(new Map(event.appInstances.map((a) => [a, true])))
519519
: null;
520520
event.propagateHeaders = event.propagateHeaders ?? [];
521-
event.propagatedContextProperties = event.propagatedContextProperties ?? [];
521+
event.propagateContextProperties = event.propagateContextProperties ?? [];
522522
event.retryFailedAfter = event.retryFailedAfter ?? DEFAULT_RETRY_AFTER;
523523
event.retryOpenAfter = event.retryOpenAfter ?? DEFAULT_RETRY_AFTER;
524524
}

src/outbox/EventQueueGenericOutboxHandler.js

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
6464
}
6565
} else {
6666
for (const actionName in genericClusterEvents) {
67-
const reg = new cds.Request({
67+
const req = new cds.Request({
6868
event: EVENT_QUEUE_ACTIONS.CLUSTER,
6969
user: this.context.user,
7070
eventQueue: {
@@ -77,14 +77,14 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
7777
this.#clusterByDataProperty(actionName, genericClusterEvents[actionName], propertyName, cb),
7878
},
7979
});
80-
const clusterResult = await this.__srvUnboxed.tx(this.context).send(reg);
80+
const clusterResult = await this.__srvUnboxed.tx(this.context).send(req);
8181
if (this.#validateCluster(clusterResult)) {
8282
Object.assign(clusterMap, clusterResult);
8383
} else {
8484
this.logger.error(
8585
"cluster result of handler is not valid. Check the documentation for the expected structure. Continuing without clustering!",
8686
{
87-
handler: reg.event,
87+
handler: req.event,
8888
clusterResult: JSON.stringify(clusterResult),
8989
}
9090
);
@@ -95,7 +95,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
9595
}
9696

9797
for (const actionName in specificClusterEvents) {
98-
const reg = new cds.Request({
98+
const req = new cds.Request({
9999
event: `${EVENT_QUEUE_ACTIONS.CLUSTER}.${actionName}`,
100100
user: this.context.user,
101101
eventQueue: {
@@ -108,14 +108,14 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
108108
this.#clusterByDataProperty(actionName, specificClusterEvents[actionName], propertyName, cb),
109109
},
110110
});
111-
const clusterResult = await this.__srvUnboxed.tx(this.context).send(reg);
111+
const clusterResult = await this.__srvUnboxed.tx(this.context).send(req);
112112
if (this.#validateCluster(clusterResult)) {
113113
Object.assign(clusterMap, clusterResult);
114114
} else {
115115
this.logger.error(
116116
"cluster result of handler is not valid. Check the documentation for the expected structure. Continuing without clustering!",
117117
{
118-
handler: reg.event,
118+
handler: req.event,
119119
clusterResult: JSON.stringify(clusterResult),
120120
}
121121
);
@@ -267,12 +267,12 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
267267
return payload;
268268
}
269269

270-
const { reg, userId } = this.#buildDispatchData(this.context, payload, {
270+
const { req, userId } = this.#buildDispatchData(payload, {
271271
queueEntries: [queueEntry],
272272
});
273-
reg.event = handlerName;
274-
await this.#setContextUser(this.context, userId, reg);
275-
const data = await this.__srvUnboxed.tx(this.context).send(reg);
273+
req.event = handlerName;
274+
await this.#setContextUser(this.context, userId, req);
275+
const data = await this.__srvUnboxed.tx(this.context).send(req);
276276
if (data) {
277277
payload.data = data;
278278
return payload;
@@ -288,12 +288,12 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
288288
return await super.hookForExceededEvents(exceededEvent);
289289
}
290290

291-
const { reg, userId } = this.#buildDispatchData(this.context, exceededEvent.payload, {
291+
const { req, userId } = this.#buildDispatchData(exceededEvent.payload, {
292292
queueEntries: [exceededEvent],
293293
});
294-
await this.#setContextUser(this.context, userId, reg);
295-
reg.event = handlerName;
296-
await this.__srvUnboxed.tx(this.context).send(reg);
294+
await this.#setContextUser(this.context, userId, req);
295+
req.event = handlerName;
296+
await this.__srvUnboxed.tx(this.context).send(req);
297297
}
298298

299299
// NOTE: Currently not exposed to CAP service; we wait for a valid use case
@@ -326,40 +326,47 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
326326

327327
async processPeriodicEvent(processContext, key, queueEntry) {
328328
const { actionName } = config.normalizeSubType(this.eventType, this.eventSubType);
329-
const reg = new cds.Event({ event: actionName, eventQueue: { processor: this, key, queueEntries: [queueEntry] } });
330-
await this.#setContextUser(processContext, config.userId, reg);
331-
await this.__srvUnboxed.tx(processContext).emit(reg);
329+
const req = new cds.Event({ event: actionName, eventQueue: { processor: this, key, queueEntries: [queueEntry] } });
330+
await this.#setContextUser(processContext, config.userId, req);
331+
await this.__srvUnboxed.tx(processContext).emit(req);
332332
}
333333

334-
#buildDispatchData(context, payload, { key, queueEntries } = {}) {
334+
#buildDispatchData(payload, { key, queueEntries } = {}) {
335335
const { useEventQueueUser } = this.eventConfig;
336336
const userId = useEventQueueUser ? config.userId : payload.contextUser;
337-
const reg = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload);
337+
const req = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload);
338338
const invocationFn = payload._fromSend ? "send" : "emit";
339-
delete reg._fromSend;
340-
delete reg.contextUser;
341-
reg.eventQueue = { processor: this, key, queueEntries, payload };
342-
return { reg, userId, invocationFn };
339+
delete req._fromSend;
340+
delete req.contextUser;
341+
req.eventQueue = { processor: this, key, queueEntries, payload };
342+
343+
if (this.eventConfig.propagateContextProperties?.length && this.transactionMode === "isolated" && cds.context) {
344+
for (const prop of this.eventConfig.propagateContextProperties) {
345+
req[prop] && (cds.context[prop] = req[prop]);
346+
}
347+
}
348+
349+
return { req, userId, invocationFn };
343350
}
344351

345-
async #setContextUser(context, userId, reg) {
352+
async #setContextUser(context, userId, req) {
346353
const authInfo = await common.getAuthContext(context.tenant);
347354
context.user = new cds.User.Privileged({
348355
id: userId,
349356
authInfo,
350357
tokenInfo: authInfo?.token,
351358
});
352-
if (reg) {
353-
reg.user = context.user;
359+
if (req) {
360+
req.user = context.user;
354361
}
355362
}
356363

357364
async processEvent(processContext, key, queueEntries, payload) {
358365
let statusTuple;
359-
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
366+
const { userId, invocationFn, req } = this.#buildDispatchData(payload, { key, queueEntries });
360367
try {
361-
await this.#setContextUser(processContext, userId, reg);
362-
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
368+
await this.#setContextUser(processContext, userId, req);
369+
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](req);
363370
statusTuple = this.#determineResultStatus(result, queueEntries);
364371
} catch (err) {
365372
this.logger.error("error processing outboxed service call", err, {
@@ -374,7 +381,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
374381
]);
375382
}
376383

377-
await this.#publishFollowupEvents(processContext, reg, statusTuple);
384+
await this.#publishFollowupEvents(processContext, req, statusTuple);
378385
return statusTuple;
379386
}
380387

src/outbox/eventQueueAsOutbox.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ function outboxed(srv, customOpts) {
4343

4444
const outboxOpts = config.getEventConfig(CDS_EVENT_TYPE, subType, srvConfig.namespace);
4545
const eventHeaders = getPropagatedHeaders(outboxOpts, req);
46-
const contextProperties = getPropagatedContextProperties(outboxOpts, req);
46+
const contextProperties = getPropagateContextProperties(outboxOpts, req);
4747
if (["persistent-outbox", "persistent-queue"].includes(outboxOpts.kind)) {
4848
await _mapToEventAndPublish(req, srvConfig.namespace, subType, eventHeaders, contextProperties);
4949
return;
@@ -81,8 +81,8 @@ const getPropagatedHeaders = (config, req) => {
8181
return Object.assign(propagateHeaders, req.headers);
8282
};
8383

84-
const getPropagatedContextProperties = (config, req) => {
85-
return config.propagatedContextProperties.reduce((properties, name) => {
84+
const getPropagateContextProperties = (config, req) => {
85+
return config.propagateContextProperties.reduce((properties, name) => {
8686
if (name in req.tx.context) {
8787
properties[name] = req.tx.context[name];
8888
}

test/__snapshots__/eventQueueOutbox.test.js.snap

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
4141
"namespace": "default",
4242
"parallelEventProcessing": 5,
4343
"priority": "medium",
44+
"propagateContextProperties": [],
4445
"propagateHeaders": [],
45-
"propagatedContextProperties": [],
4646
"retryAttempts": 20,
4747
"retryFailedAfter": 300000,
4848
"retryOpenAfter": 300000,
@@ -70,8 +70,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true ad-hoc events overwrite se
7070
"namespace": "default",
7171
"parallelEventProcessing": 5,
7272
"priority": "medium",
73+
"propagateContextProperties": [],
7374
"propagateHeaders": [],
74-
"propagatedContextProperties": [],
7575
"retryAttempts": 20,
7676
"retryFailedAfter": 300000,
7777
"retryOpenAfter": 300000,
@@ -99,8 +99,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true config is not overwritten
9999
"namespace": "default",
100100
"parallelEventProcessing": 5,
101101
"priority": "medium",
102+
"propagateContextProperties": [],
102103
"propagateHeaders": [],
103-
"propagatedContextProperties": [],
104104
"retryAttempts": 20,
105105
"retryFailedAfter": 300000,
106106
"retryOpenAfter": 300000,
@@ -141,8 +141,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true custom options should win
141141
"namespace": "default",
142142
"parallelEventProcessing": 5,
143143
"priority": "medium",
144+
"propagateContextProperties": [],
144145
"propagateHeaders": [],
145-
"propagatedContextProperties": [],
146146
"retryAttempts": 20,
147147
"retryFailedAfter": 300000,
148148
"retryOpenAfter": 300000,
@@ -182,8 +182,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true map config to event-queue
182182
"namespace": "default",
183183
"parallelEventProcessing": 5,
184184
"priority": "medium",
185+
"propagateContextProperties": [],
185186
"propagateHeaders": [],
186-
"propagatedContextProperties": [],
187187
"retryAttempts": 20,
188188
"retryFailedAfter": 300000,
189189
"retryOpenAfter": 300000,
@@ -222,8 +222,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
222222
"load": 60,
223223
"namespace": "default",
224224
"priority": "medium",
225+
"propagateContextProperties": [],
225226
"propagateHeaders": [],
226-
"propagatedContextProperties": [],
227227
"retryFailedAfter": 300000,
228228
"retryOpenAfter": 300000,
229229
"subType": "NotificationServicePeriodic.main",
@@ -260,8 +260,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
260260
"load": 60,
261261
"namespace": "default",
262262
"priority": "medium",
263+
"propagateContextProperties": [],
263264
"propagateHeaders": [],
264-
"propagatedContextProperties": [],
265265
"retryFailedAfter": 300000,
266266
"retryOpenAfter": 300000,
267267
"subType": "NotificationServicePeriodic.main",
@@ -289,8 +289,8 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true periodic events inherit co
289289
"load": 60,
290290
"namespace": "default",
291291
"priority": "medium",
292+
"propagateContextProperties": [],
292293
"propagateHeaders": [],
293-
"propagatedContextProperties": [],
294294
"retryFailedAfter": 300000,
295295
"retryOpenAfter": 300000,
296296
"subType": "NotificationServicePeriodic.main",

test/__snapshots__/initialize.test.js.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ exports[`initialize read yaml config file 1`] = `
1616
"namespace": "default",
1717
"parallelEventProcessing": 5,
1818
"priority": "medium",
19+
"propagateContextProperties": [],
1920
"propagateHeaders": [],
20-
"propagatedContextProperties": [],
2121
"retryFailedAfter": 300000,
2222
"retryOpenAfter": 300000,
2323
"subType": "Task",
@@ -37,8 +37,8 @@ exports[`initialize read yaml config file 1`] = `
3737
"namespace": "default",
3838
"parallelEventProcessing": 1,
3939
"priority": "medium",
40+
"propagateContextProperties": [],
4041
"propagateHeaders": [],
41-
"propagatedContextProperties": [],
4242
"retryFailedAfter": 300000,
4343
"retryOpenAfter": 300000,
4444
"subType": "CommunicationSystem",

test/asset/outboxProject/srv/service/standard-service.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class StandardService extends cds.Service {
1212
data: req.data,
1313
user: req.user.id,
1414
headers: req.headers,
15-
features: req.features,
15+
features: cds.context?.features,
1616
});
1717
});
1818
});

test/eventQueueOutbox.test.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ cds.env.requires.ContextProperties = {
148148
impl: path.join(basePath, "srv/service/standard-service.js"),
149149
queued: {
150150
kind: "persistent-queue",
151-
propagatedContextProperties: ["notExisting"],
151+
propagateContextProperties: ["notExisting"],
152152
events: {
153-
main2: { propagatedContextProperties: ["features"] },
153+
main2: { propagateContextProperties: ["features"] },
154154
},
155155
},
156156
};
@@ -2481,6 +2481,7 @@ describe("event-queue outbox", () => {
24812481
});
24822482
const payload = JSON.parse(event.payload);
24832483
expect(payload.features).toEqual({ a: true, b: true });
2484+
await commitAndOpenNew();
24842485
await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.main2`);
24852486
expect(loggerMock).actionCalled("main2", { features: { a: true, b: true } });
24862487
await commitAndOpenNew();

0 commit comments

Comments
 (0)