Skip to content

Commit b126140

Browse files
committed
Added delay timer property
1 parent 6413c8a commit b126140

File tree

4 files changed

+83
-40
lines changed

4 files changed

+83
-40
lines changed

README.md

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,39 +92,34 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
9292
### Re-assemble Messages
9393
**(Beta)**
9494

95-
Inverse of the split action: Given a stream of incoming messages that which have
96-
been split apart by a split action (or similar), produce one message once all
97-
message parts have arrived.
98-
95+
Inverse of the split action: Given a stream of incoming messages a sum message is generated.
96+
Has 3 different behaviour variants:
97+
* Only specify group size and no delay timer. A message is emitted once the group size is reached for the given group. Should less message then the given group size arrive, then the group is silently discarded.
98+
* Only specify delay timer and no group size. All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
99+
* Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
99100
Supported:
100101
* Messages can be re-ordered in the flow
101102
* If messages are re-delivered as a result of the platform's at once delivery guarantee, does not trigger false positives
102103
* Messages from one original message can be interleaved with messages from another original message
103104
(e.g. Two overlapping webhook calls arrive and the flow has components where parallel processing > 1.)
104-
105105
Limitations:
106-
* All groups must have one or more messages. (i.e. No groups of size 0).
106+
* All groups must have one or more messages. (i.e. No groups of size 0).
107107
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported)
108108
* If all messages in a group fail to arrive at the re-assemble action (because one message suffered an error earlier in the flow)
109109
then this component will silently discard the group.
110-
* All messages must arrive within the same container lifetime.
111-
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
110+
* All messages must arrive within the same container lifetime.
111+
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
112112
* The group is dropped if there are any unexpected restarts to the container.
113-
* Size of the group must be known by all group members.
114-
* Messages are only emitter when all parts arrive. Emitting a message only when the first part arrives isn't supported.
115-
*
116-
113+
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
114+
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
115+
* The delay timer can not exceed 40,000 milliseconds. If more than this maximum is given, then this maximum will be used instead.
117116
#### List of Expected Config fields
118117
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.
119-
120-
```messageId``` - Id for a message to distinguish it from other messages in the group.
118+
```messageId``` - Id for a message to distinguish it from other messages in the group.
121119
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
122-
123-
```messageData``` - object for providing some data derived from the steps between splitting and re-assembling.
124-
120+
```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group.
125121
```groupSize``` - Number of messages in the group.
126-
127-
```Delay timer(in ms)``` - Time the process waits when no incoming messages before emiting(Max 40000 miliseconds)
122+
```Delay timer (in ms)``` - Time the process waits when no incoming messages before emiting (Max 40,000 milliseconds)
128123

129124
## Known limitations (common for the component)
130125
No.

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
"type": "number",
7878
"required": false,
7979
"help":{
80-
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
80+
"description": "Time the process waits when no incoming messages before emiting(Maximum 20,000 milliseconds)"
8181
},
8282
"title": "Delay timer(in ms)",
8383
"order": 2

lib/actions/reassemble.js

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ async function processAction(msg) {
2727
messageData,
2828
};
2929

30-
// in case no delay was defined
31-
var delay = msg.body.timersec ? msg.body.timersec : '40000'
32-
if(delay >= 40000){delay == 40000}
33-
clearTimeout(timeHandle);
34-
timeHandle = setTimeout(timer, delay, this);
35-
3630
if (!messageData) {
3731
incomingData[messageId] = undefined;
3832
}
@@ -68,14 +62,13 @@ async function processAction(msg) {
6862
`Saw message ${messageId} of group ${groupId}.
6963
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
7064
);
71-
if(messageGroupSize !== '' && messageGroupSize !== undefined ){
72-
clearTimeout(timeHandle);
7365

66+
// when group sized is defined || when both group size and delay timer are defined
67+
if(messageGroupSize !== '' && messageGroupSize !== undefined){
7468
if(groupList.includes(groupId)){
7569
parsedMessageGroup.messages.forEach((message) => {
7670
incomingData[message.messageId] = message.messageData;
7771
});
78-
7972
for(var key in groupElements){
8073
if(groupElements[key].groupId === groupId){
8174
groupElements[key].groupSize == messagesNumberSeen
@@ -87,10 +80,8 @@ async function processAction(msg) {
8780
parsedMessageGroup.messages.forEach((message) => {
8881
incomingData[message.messageId] = message.messageData;
8982
});
90-
9183
groupList.push(groupId)
9284
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
93-
9485
}
9586
if (messagesNumberSeen >= messageGroupSize) {
9687
parsedMessageGroup.messages.forEach((message) => {
@@ -107,17 +98,20 @@ async function processAction(msg) {
10798
groupList = groupList.filter(def => def != groupId);
10899
groupElements = groupElements.filter(def => def.groupId != groupId);
109100
}
110-
else{
111-
timeHandle = setTimeout(timer, delay, this);
112-
}
113101
}
114-
else{
102+
// When delay timer is defined and no group size defined
103+
if(messageGroupSize === undefined && msg.body.timersec !== undefined){
104+
105+
// in case no delay was defined
106+
var delay = msg.body.timersec
107+
if(delay >= 40000){delay == 40000}
108+
clearTimeout(timeHandle);
109+
timeHandle = setTimeout(timer, delay, this);
110+
115111
if(groupList.includes(groupId)){
116-
117112
parsedMessageGroup.messages.forEach((message) => {
118113
incomingData[message.messageId] = message.messageData;
119114
});
120-
121115
for(var key in groupElements){
122116
if(groupElements[key].groupId === groupId){
123117
groupElements[key].groupSize == messagesNumberSeen
@@ -126,16 +120,16 @@ async function processAction(msg) {
126120
}
127121
}
128122
else{
129-
130123
parsedMessageGroup.messages.forEach((message) => {
131124
incomingData[message.messageId] = message.messageData;
132125
});
133-
134126
groupList.push(groupId)
135127
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
136-
137128
}
138129
}
130+
if(messageGroupSize === undefined && msg.body.timersec === undefined){
131+
throw new Error('Either group size or delay timer need to be defined');
132+
}
139133
}
140134

141135
exports.process = processAction;

spec/reassemble.spec.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ process.env.ELASTICIO_STEP_ID = 'step_id';
1414
const { expect } = chai;
1515
chai.use(require('chai-as-promised'));
1616

17+
function sleep(ms) {
18+
return new Promise(resolve => setTimeout(resolve, ms));
19+
}
20+
1721
describe('Split on JSONata ', () => {
1822
let self;
1923

@@ -232,4 +236,54 @@ describe('Split on JSONata ', () => {
232236
expect(putMessageGroup1.isDone()).to.equal(true);
233237
expect(deleteMessageGroup.isDone()).to.equal(true);
234238
});
239+
240+
it('Base Case: No group Group Size, emit after 1000 miliseconds if no incoming message', async () => {
241+
const msg = {
242+
body: {
243+
groupId: 'group123',
244+
messageId: 'msg123',
245+
groupSize : undefined,
246+
timersec: 1000
247+
},
248+
};
249+
250+
const getMessageGroups = nock('https://ma.estr').get('/objects?query[externalid]=group123').reply(200, []);
251+
const postMessageGroup = nock('https://ma.estr')
252+
.post('/objects', { messages: [], messageIdsSeen: {} })
253+
.matchHeader('x-query-externalid', 'group123')
254+
.reply(200, { objectId: 'group123' });
255+
const getMessageGroup = nock('https://ma.estr')
256+
.get('/objects/group123')
257+
.reply(200, { messages: [], messageIdsSeen: {} });
258+
const putMessageGroup = nock('https://ma.estr').put('/objects/group123').reply(200, {});
259+
const getMessageGroup1 = nock('https://ma.estr')
260+
.get('/objects/group123')
261+
.reply(200, { messages: [{ msg123: undefined }], messageIdsSeen: { msg123: 'msg123' } })
262+
263+
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
264+
265+
await reassemble.process.call(self, msg, {});
266+
267+
// timersec + 0,5 second
268+
await sleep(1500);
269+
270+
271+
expect(self.emit.calledOnce).to.be.true;
272+
expect(self.emit.lastCall.args[1].body).to.deep.equal({
273+
groupSize: 1,
274+
groupId: 'group123',
275+
messageData: {
276+
msg123: undefined,
277+
undefined,
278+
},
279+
});
280+
281+
expect(getMessageGroups.isDone()).to.equal(true);
282+
expect(postMessageGroup.isDone()).to.equal(true);
283+
expect(getMessageGroup.isDone()).to.equal(true);
284+
expect(putMessageGroup.isDone()).to.equal(true);
285+
expect(getMessageGroup1.isDone()).to.equal(true);
286+
expect(putMessageGroup1.isDone()).to.equal(true);
287+
});
288+
235289
});

0 commit comments

Comments
 (0)