Skip to content

Commit 5a886b8

Browse files
committed
fixing bugs
1 parent f9bad9f commit 5a886b8

File tree

4 files changed

+41
-48
lines changed

4 files changed

+41
-48
lines changed

README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
9393

9494
Inverse of the split action: Given a stream of incoming messages a sum message is generated.
9595
Has 3 different behaviour variants(options):
96-
* Use Group Size: A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group is silently discarded.
97-
* Use Timeout: All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
98-
* Use Group Size and Timeout: Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
96+
* Produce Groups of Fixed Size (Don't Emit Partial Groups): A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group will not be emitted.
97+
* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages at which point messages will be emitted for each group.
98+
* Produce Groups of Fixed Size (Emit Partial Groups): Specify both group size and delay timer. Once a group is complete, that group will be emitted. Once there are no more incoming messages, then partially completed groups will also be emitted.
9999

100100
Supported:
101101
* Messages can be re-ordered in the flow
@@ -106,8 +106,7 @@ Supported:
106106
Limitations:
107107
* All groups must have one or more messages. (i.e. No groups of size 0).
108108
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
109-
* All messages must arrive within the same container lifetime.
110-
If all the messages in the group do not arrive, then the group will be silently discarded.
109+
If all the messages in the group do not arrive, then the group will not be emitted..
111110
* The group is dropped if there are any unexpected restarts to the container.
112111
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
113112
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.

component.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@
4646
"fields": {
4747
"mode": {
4848
"viewClass": "SelectView",
49-
"label": "Select option",
49+
"label": "Behavior",
5050
"required": true,
51-
"model": {"groupSize": "Use Group Size", "timeout": "Use Timeout", "groupSize&timeout": "Use Group Size and Timeout"},
52-
"prompt": "Select type"
51+
"model": {"groupSize": "Produce Groups of Fixed Size (Don't Emit Partial Groups)", "timeout": "Group All Incoming Messages", "groupSize&timeout": "Produce Groups of Fixed Size (Emit Partial Groups)"},
52+
"prompt": "Select behavior"
5353
}
5454
}
5555
}

lib/actions/reassemble.js

Lines changed: 28 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,27 @@ const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapp
44

55
let timeHandle;
66
let groupList = [];
7-
let groupElements = [{ groupSize: undefined, groupId: undefined, messageData: undefined }];
87
let delay;
98

109
async function timer(this_) {
11-
for (let i = 1; i < groupElements.length; i += 1) {
12-
this_.emit('data', messages.newMessageWithBody(groupElements[i]));
10+
for (let i = 0; i < groupList.length; i += 1) {
11+
const storage = new ObjectStorageWrapperExtended(this_);
12+
// eslint-disable-next-line no-await-in-loop
13+
const results = await storage.lookupParsedObjectById(groupList[i]);
14+
const incomingData = {};
15+
results.messages.forEach((message) => {
16+
incomingData[message.messageId] = message.messageData;
17+
});
18+
19+
// eslint-disable-next-line no-await-in-loop
20+
await storage.deleteObjectById(groupList[i]);
21+
22+
// eslint-disable-next-line no-await-in-loop
23+
await this_.emit('data', messages.newMessageWithBody({
24+
groupSize: results.messages.length,
25+
groupId: results.messages[0].groupId,
26+
messageData: incomingData,
27+
}));
1328
}
1429
}
1530

@@ -72,7 +87,7 @@ async function processAction(msg, cfg) {
7287
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
7388
);
7489

75-
// when group sized is defined || when both group size and delay timer are defined
90+
// when grouSized option is selected
7691
if (mode === 'groupSize') {
7792
if (messagesNumberSeen >= messageGroupSize) {
7893
parsedMessageGroup.messages.forEach((message) => {
@@ -90,28 +105,14 @@ async function processAction(msg, cfg) {
90105

91106
// When delay timer option is selected
92107
if (mode === 'timeout') {
93-
// delay timer
94108
delay = msg.body.timersec;
95109
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
96110
clearTimeout(timeHandle);
97111
timeHandle = setTimeout(timer, delay, this);
98-
99-
if (groupList.includes(groupId)) {
100-
parsedMessageGroup.messages.forEach((message) => {
101-
incomingData[message.messageId] = message.messageData;
102-
});
103-
// update incoming messageData in current group
104-
const currentGroup = groupElements.filter((x) => x.groupId === groupId);
105-
currentGroup[0].groupSize = messagesNumberSeen;
106-
currentGroup[0].messageData = incomingData;
107-
groupElements = groupElements.filter((x) => x.groupId !== groupId);
108-
groupElements.push(currentGroup[0]);
112+
if (groupList.includes(messageGroupId)) {
113+
// no action required
109114
} else {
110-
parsedMessageGroup.messages.forEach((message) => {
111-
incomingData[message.messageId] = message.messageData;
112-
});
113-
groupList.push(groupId);
114-
groupElements.push({ groupSize: messagesNumberSeen, groupId, messageData: incomingData });
115+
groupList.push(messageGroupId);
115116
}
116117
}
117118

@@ -122,23 +123,12 @@ async function processAction(msg, cfg) {
122123
clearTimeout(timeHandle);
123124
timeHandle = setTimeout(timer, delay, this);
124125

125-
if (groupList.includes(groupId)) {
126-
parsedMessageGroup.messages.forEach((message) => {
127-
incomingData[message.messageId] = message.messageData;
128-
});
129-
// update incoming messageData in current group
130-
const currentGroup = groupElements.filter((x) => x.groupId === groupId);
131-
currentGroup[0].groupSize = messagesNumberSeen;
132-
currentGroup[0].messageData = incomingData;
133-
groupElements = groupElements.filter((x) => x.groupId !== groupId);
134-
groupElements.push(currentGroup[0]);
126+
if (groupList.includes(messageGroupId)) {
127+
// no action required
135128
} else {
136-
parsedMessageGroup.messages.forEach((message) => {
137-
incomingData[message.messageId] = message.messageData;
138-
});
139-
groupList.push(groupId);
140-
groupElements.push({ groupSize: messagesNumberSeen, groupId, messageData: incomingData });
129+
groupList.push(messageGroupId);
141130
}
131+
142132
if (messagesNumberSeen >= messageGroupSize) {
143133
parsedMessageGroup.messages.forEach((message) => {
144134
incomingData[message.messageId] = message.messageData;
@@ -151,8 +141,7 @@ async function processAction(msg, cfg) {
151141
}));
152142
await storage.deleteObjectById(messageGroupId);
153143
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
154-
groupList = groupList.filter((def) => def !== groupId);
155-
groupElements = groupElements.filter((def) => def.groupId !== groupId);
144+
groupList = groupList.filter((def) => def !== messageGroupId);
156145
}
157146
}
158147
}
@@ -285,7 +274,7 @@ async function getMetaModel(cfg) {
285274
},
286275
});
287276
}
288-
return true;
277+
return false;
289278
}
290279
module.exports = {
291280
process: processAction,

spec/reassemble.spec.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ describe('Split on JSONata ', () => {
261261
.reply(200, { messages: [{ msg123: undefined }], messageIdsSeen: { msg123: 'msg123' } });
262262

263263
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
264+
const getMessageGroup2 = nock('https://ma.estr')
265+
.get('/objects/group123')
266+
.reply(200, { messages: [{ groupId: 'group123' }], messageIdsSeen: { msg123: 'msg123' } });
267+
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});
264268

265269
await reassemble.process.call(self, msg, { mode: 'timeout' });
266270

@@ -272,7 +276,6 @@ describe('Split on JSONata ', () => {
272276
groupSize: 1,
273277
groupId: 'group123',
274278
messageData: {
275-
msg123: undefined,
276279
undefined,
277280
},
278281
});
@@ -283,5 +286,7 @@ describe('Split on JSONata ', () => {
283286
expect(putMessageGroup.isDone()).to.equal(true);
284287
expect(getMessageGroup1.isDone()).to.equal(true);
285288
expect(putMessageGroup1.isDone()).to.equal(true);
289+
expect(getMessageGroup2.isDone()).to.equal(true);
290+
expect(deleteMessageGroup.isDone()).to.equal(true);
286291
});
287292
});

0 commit comments

Comments
 (0)