Skip to content

Commit 8bf67e5

Browse files
committed
refactor message formatter
1 parent 992767c commit 8bf67e5

File tree

10 files changed

+224
-132
lines changed

10 files changed

+224
-132
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
## Unreleased
44

5-
- fix old common exports
5+
## [17.0.0] - 2025-02-08
6+
7+
- refactor message formatting, not sure if it breaking or not, but now it behaves as expected when formatting with multiple listeners
8+
- fix activity discard run when activity has completed executing but not yet reached end, status `executed`
9+
- use es5 trailing comma
610

711
## [16.2.2] - 2024-12-26
812

dist/MessageFormatter.js

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@ var _Errors = require("./error/Errors.js");
1010
var _smqp = require("smqp");
1111
const kOnMessage = Symbol.for('onMessage');
1212
const kExecution = Symbol.for('execution');
13-
function Formatter(element, formatQ) {
13+
const EXEC_ROUTING_KEY = 'run._formatting.exec';
14+
15+
/**
16+
* Message formatter used to enrich an element run message before continuing to the next run message
17+
* @param {import('types').ElementBase} element
18+
*/
19+
function Formatter(element) {
1420
const {
1521
id,
1622
broker,
@@ -19,16 +25,19 @@ function Formatter(element, formatQ) {
1925
this.id = id;
2026
this.broker = broker;
2127
this.logger = logger;
22-
this.formatQ = formatQ;
2328
this[kOnMessage] = this._onMessage.bind(this);
2429
}
30+
31+
/**
32+
* Format message
33+
* @param {import('types').MessageElement} message
34+
* @param {CallableFunction} callback
35+
*/
2536
Formatter.prototype.format = function format(message, callback) {
2637
const correlationId = this._runId = (0, _shared.getUniqueId)(message.fields.routingKey);
2738
const consumerTag = '_formatter-' + correlationId;
28-
const formatQ = this.formatQ;
29-
formatQ.queueMessage({
30-
routingKey: '_formatting.exec'
31-
}, {}, {
39+
const broker = this.broker;
40+
broker.publish('format', EXEC_ROUTING_KEY, {}, {
3241
correlationId,
3342
persistent: false
3443
});
@@ -37,11 +46,11 @@ Formatter.prototype.format = function format(message, callback) {
3746
formatKey: message.fields.routingKey,
3847
runMessage: (0, _messageHelper.cloneMessage)(message),
3948
callback,
40-
pending: [],
49+
pending: new Set(),
4150
formatted: false,
4251
executeMessage: null
4352
};
44-
formatQ.consume(this[kOnMessage], {
53+
broker.consume('format-run-q', this[kOnMessage], {
4554
consumerTag,
4655
prefetch: 100
4756
});
@@ -53,41 +62,33 @@ Formatter.prototype._onMessage = function onMessage(routingKey, message) {
5362
pending,
5463
executeMessage
5564
} = this[kExecution];
56-
const asyncFormatting = pending.length;
57-
switch (routingKey) {
58-
case '_formatting.exec':
59-
if (message.properties.correlationId !== correlationId) return message.ack();
60-
if (!asyncFormatting) {
61-
message.ack();
62-
return this._complete(message);
63-
}
64-
this[kExecution].executeMessage = message;
65-
break;
66-
default:
67-
{
68-
message.ack();
69-
const endRoutingKey = message.content?.endRoutingKey;
70-
if (endRoutingKey) {
71-
this._decorate(message.content);
72-
pending.push(message);
73-
return this._debug(`start formatting ${formatKey} message content with formatter ${routingKey}`);
74-
}
75-
if (asyncFormatting) {
76-
const {
77-
isError,
78-
message: startMessage
79-
} = this._popFormatStart(pending, routingKey);
80-
if (startMessage) startMessage.ack();
81-
if (isError) {
82-
return this._complete(message, true);
83-
}
84-
}
85-
this._decorate(message.content);
86-
this._debug(`format ${message.fields.routingKey} message content with formatter ${routingKey}`);
87-
if (executeMessage && asyncFormatting && !pending.length) {
88-
this._complete(message);
89-
}
65+
const asyncFormatting = pending.size;
66+
if (routingKey === EXEC_ROUTING_KEY) {
67+
if (message.properties.correlationId !== correlationId) return message.ack();
68+
message.ack();
69+
if (!asyncFormatting) {
70+
return this._complete(message);
71+
}
72+
this[kExecution].executeMessage = message;
73+
} else {
74+
message.ack();
75+
const endRoutingKey = message.content?.endRoutingKey;
76+
if (endRoutingKey) {
77+
this._enrich(message.content);
78+
pending.add(message);
79+
return this._debug(`start formatting ${formatKey} message content with formatter ${routingKey}`);
80+
}
81+
if (asyncFormatting) {
82+
const isError = this._popFormatStart(pending, routingKey).isError;
83+
if (isError) {
84+
return this._complete(message, true);
9085
}
86+
}
87+
this._enrich(message.content);
88+
this._debug(`format ${message.fields.routingKey} message content with formatter ${routingKey}`);
89+
if (executeMessage && !pending.size) {
90+
this._complete(message);
91+
}
9192
}
9293
};
9394
Formatter.prototype._complete = function complete(message, isError) {
@@ -109,7 +110,7 @@ Formatter.prototype._complete = function complete(message, isError) {
109110
}
110111
return callback(null, runMessage.content, formatted);
111112
};
112-
Formatter.prototype._decorate = function decorate(withContent) {
113+
Formatter.prototype._enrich = function enrich(withContent) {
113114
const content = this[kExecution].runMessage.content;
114115
for (const key in withContent) {
115116
switch (key) {
@@ -134,20 +135,19 @@ Formatter.prototype._decorate = function decorate(withContent) {
134135
}
135136
};
136137
Formatter.prototype._popFormatStart = function popFormattingStart(pending, routingKey) {
137-
for (let idx = 0; idx < pending.length; idx++) {
138-
const msg = pending[idx];
138+
for (const msg of pending) {
139139
const {
140140
endRoutingKey,
141141
errorRoutingKey = '#.error'
142142
} = msg.content;
143143
if (endRoutingKey && (0, _smqp.getRoutingKeyPattern)(endRoutingKey).test(routingKey)) {
144144
this._debug(`completed formatting ${msg.fields.routingKey} message content with formatter ${routingKey}`);
145-
pending.splice(idx, 1);
145+
pending.delete(msg);
146146
return {
147147
message: msg
148148
};
149149
} else if ((0, _smqp.getRoutingKeyPattern)(errorRoutingKey).test(routingKey)) {
150-
pending.splice(idx, 1);
150+
pending.delete(msg);
151151
return {
152152
isError: true,
153153
message: msg

dist/activity/Activity.js

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,7 @@ Object.defineProperties(Activity.prototype, {
156156
get() {
157157
let formatter = this[kFormatter];
158158
if (formatter) return formatter;
159-
const broker = this.broker;
160-
formatter = this[kFormatter] = new _MessageFormatter.Formatter({
161-
id: this.id,
162-
broker,
163-
logger: this.logger
164-
}, broker.getQueue('format-run-q'));
159+
formatter = this[kFormatter] = new _MessageFormatter.Formatter(this);
165160
return formatter;
166161
}
167162
},
@@ -430,7 +425,13 @@ Activity.prototype._discardRun = function discardRun() {
430425
if (!status) return;
431426
const execution = this[kExec].get('execution');
432427
if (execution && !execution.completed) return;
428+
let discardRoutingKey = 'run.discard';
433429
switch (status) {
430+
case 'executed':
431+
{
432+
discardRoutingKey = 'run.discarded';
433+
break;
434+
}
434435
case 'end':
435436
case 'executing':
436437
case 'error':
@@ -442,7 +443,9 @@ Activity.prototype._discardRun = function discardRun() {
442443
if (this.extensions) this.extensions.deactivate((0, _messageHelper.cloneMessage)(stateMessage));
443444
const broker = this.broker;
444445
broker.getQueue('run-q').purge();
445-
broker.publish('run', 'run.discard', (0, _messageHelper.cloneContent)(stateMessage.content));
446+
broker.publish('run', discardRoutingKey, (0, _messageHelper.cloneContent)(stateMessage.content), {
447+
correlationId: stateMessage.properties.correlationId
448+
});
446449
this[kConsuming] = true;
447450
this._consumeRunQ();
448451
};
@@ -609,9 +612,11 @@ Activity.prototype._onRunMessage = function onRunMessage(routingKey, message, me
609612
const preStatus = this.status;
610613
this.status = 'formatting';
611614
return this.formatter.format(message, (err, formattedContent, formatted) => {
612-
if (err) return this.emitFatal(err, message.content);
613-
if (formatted) message.content = formattedContent;
614615
this.status = preStatus;
616+
if (err) {
617+
return this.emitFatal(err, message.content);
618+
}
619+
if (formatted) message.content = formattedContent;
615620
this._continueRunMessage(routingKey, message, messageProperties);
616621
});
617622
};

eslint.config.js

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,6 @@ export default [
7575
},
7676
rules,
7777
},
78-
{
79-
files: ['src/**/*.js'],
80-
languageOptions: {
81-
globals: {
82-
...globals['shared-node-browser'],
83-
...globals.es6,
84-
},
85-
},
86-
},
8778
{
8879
files: ['test/**/*.js'],
8980
languageOptions: {

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "bpmn-elements",
3-
"version": "16.2.2",
3+
"version": "17.0.0",
44
"description": "Executable workflow elements based on BPMN 2.0",
55
"type": "module",
66
"main": "./dist/index.js",
@@ -104,6 +104,6 @@
104104
},
105105
"dependencies": {
106106
"@0dep/piso": "^2.4.0",
107-
"smqp": "^9.0.4"
107+
"smqp": "^9.0.6"
108108
}
109109
}

src/MessageFormatter.js

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,31 @@ import { getRoutingKeyPattern } from 'smqp';
66
const kOnMessage = Symbol.for('onMessage');
77
const kExecution = Symbol.for('execution');
88

9-
const execRoutingKey = '_formatting.exec';
9+
const EXEC_ROUTING_KEY = 'run._formatting.exec';
1010

11-
export function Formatter(element, formatQ) {
11+
/**
12+
* Message formatter used to enrich an element run message before continuing to the next run message
13+
* @param {import('types').ElementBase} element
14+
*/
15+
export function Formatter(element) {
1216
const { id, broker, logger } = element;
1317
this.id = id;
1418
this.broker = broker;
1519
this.logger = logger;
16-
this.formatQ = formatQ;
1720
this[kOnMessage] = this._onMessage.bind(this);
1821
}
1922

23+
/**
24+
* Format message
25+
* @param {import('types').ElementBrokerMessage} message
26+
* @param {CallableFunction} callback
27+
*/
2028
Formatter.prototype.format = function format(message, callback) {
2129
const correlationId = (this._runId = getUniqueId(message.fields.routingKey));
2230
const consumerTag = '_formatter-' + correlationId;
23-
const formatQ = this.formatQ;
31+
const broker = this.broker;
2432

25-
console.log({
26-
format: message.fields.routingKey,
27-
q: formatQ.messageCount,
28-
c: formatQ.consumerCount,
29-
b: this.broker.getExchange('format').bindings,
30-
});
31-
32-
formatQ.queueMessage({ routingKey: execRoutingKey }, {}, { correlationId, persistent: false });
33+
broker.publish('format', EXEC_ROUTING_KEY, {}, { correlationId, persistent: false });
3334

3435
this[kExecution] = {
3536
correlationId,
@@ -41,7 +42,7 @@ Formatter.prototype.format = function format(message, callback) {
4142
executeMessage: null,
4243
};
4344

44-
formatQ.consume(this[kOnMessage], {
45+
broker.consume('format-run-q', this[kOnMessage], {
4546
consumerTag,
4647
prefetch: 100,
4748
});
@@ -50,12 +51,11 @@ Formatter.prototype.format = function format(message, callback) {
5051
Formatter.prototype._onMessage = function onMessage(routingKey, message) {
5152
const { formatKey, correlationId, pending, executeMessage } = this[kExecution];
5253
const asyncFormatting = pending.size;
53-
console.log({ f: routingKey, p: asyncFormatting });
5454

55-
if (routingKey === execRoutingKey) {
55+
if (routingKey === EXEC_ROUTING_KEY) {
5656
if (message.properties.correlationId !== correlationId) return message.ack();
57+
message.ack();
5758
if (!asyncFormatting) {
58-
message.ack();
5959
return this._complete(message);
6060
}
6161
this[kExecution].executeMessage = message;
@@ -71,8 +71,7 @@ Formatter.prototype._onMessage = function onMessage(routingKey, message) {
7171
}
7272

7373
if (asyncFormatting) {
74-
const { isError, message: startMessage } = this._popFormatStart(pending, routingKey);
75-
if (startMessage) startMessage.ack();
74+
const isError = this._popFormatStart(pending, routingKey).isError;
7675

7776
if (isError) {
7877
return this._complete(message, true);
@@ -82,7 +81,7 @@ Formatter.prototype._onMessage = function onMessage(routingKey, message) {
8281
this._enrich(message.content);
8382
this._debug(`format ${message.fields.routingKey} message content with formatter ${routingKey}`);
8483

85-
if (executeMessage && asyncFormatting && !pending.size) {
84+
if (executeMessage && !pending.size) {
8685
this._complete(message);
8786
}
8887
}

0 commit comments

Comments
 (0)