Skip to content
5 changes: 1 addition & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
## 1.2.2 (August 25, 2021)
* Added delay timer property in re-assemble action.

## 1.2.1 (July 23, 2021)
## 1.3.1 (July 23, 2021)
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)

## 1.2.0 (July 9, 2021)
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
### Re-assemble Messages

Inverse of the split action: Given a stream of incoming messages a sum message is generated.
Has 3 different behaviour variants:
* Only specify group size and no delay timer. A message is emitted once the group size is reached for the given group. Should less message then the given group size arrive, then the group is silently discarded.
* Only specify delay timer and no group size. All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
* Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
Has 3 different behaviour variants(options):
* Use Group Size: A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group is silently discarded.
* Use Timeout: All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
* Use Group Size and Timeout: Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.

Supported:
* Messages can be re-ordered in the flow
Expand All @@ -107,7 +107,7 @@ Limitations:
* All groups must have one or more messages. (i.e. No groups of size 0).
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
* All messages must arrive within the same container lifetime.
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
If all the messages in the group do not arrive, then the group will be silently discarded.
* The group is dropped if there are any unexpected restarts to the container.
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
Expand Down
248 changes: 210 additions & 38 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ async function timer(this_) {
}
}

async function processAction(msg) {
async function processAction(msg,cfg) {
const mode = cfg.mode
const storage = new ObjectStorageWrapperExtended(this);
const {
groupSize,
Expand All @@ -30,9 +31,16 @@ async function processAction(msg) {
if (!messageData) {
incomingData[messageId] = undefined;
}
if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
if(mode == 'groupSize'){
if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
};
if(mode == 'timeout'){
if (msg.body.timersec <= 0) {
throw new Error('Delay timer must be a positive integer.');
}
};
const {
messageGroup,
messageGroupId,
Expand Down Expand Up @@ -64,45 +72,25 @@ async function processAction(msg) {
);

// when group sized is defined || when both group size and delay timer are defined
if(messageGroupSize !== '' && messageGroupSize !== undefined){
if(groupList.includes(groupId)){
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
for(var key in groupElements){
if(groupElements[key].groupId === groupId){
groupElements[key].groupSize == messagesNumberSeen
groupElements[key].messageData == incomingData
}
}
}
else{
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId)
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
if(mode == 'groupSize'){
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter(def => def != groupId);
groupElements = groupElements.filter(def => def.groupId != groupId);
}
}
// When delay timer is defined and no group size defined
if(messageGroupSize === undefined && msg.body.timersec !== undefined){

// in case no delay was defined
// When delay timer option is selected
if(mode == 'timeout'){

// delay timer
var delay = msg.body.timersec
if(delay >= 40000){delay == 40000}
clearTimeout(timeHandle);
Expand All @@ -112,12 +100,11 @@ async function processAction(msg) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
for(var key in groupElements){
if(groupElements[key].groupId === groupId){
groupElements[key].groupSize == messagesNumberSeen
groupElements[key].messageData == incomingData
}
}
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
currentGroup[0].groupSize = messagesNumberSeen
currentGroup[0].messageData = incomingData
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
groupElements.push(currentGroup[0]);
}
else{
parsedMessageGroup.messages.forEach((message) => {
Expand All @@ -127,9 +114,194 @@ async function processAction(msg) {
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
}
if(messageGroupSize === undefined && msg.body.timersec === undefined){
throw new Error('Either group size or delay timer need to be defined');

// When both groupSize and delay timer option is selected
if(mode == 'groupSize&timeout'){

var delay = msg.body.timersec
if(delay >= 40000){delay == 40000}
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);

if(groupList.includes(groupId)){
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
currentGroup[0].groupSize = messagesNumberSeen
currentGroup[0].messageData = incomingData
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
groupElements.push(currentGroup[0]);
}
else{
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId)
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter(def => def != groupId);
groupElements = groupElements.filter(def => def.groupId != groupId);
}
}
}

exports.process = processAction;
//----------------------------------------------------------------------------------------------------------------------------
//Dynamic drop-down logic starts hier

async function dynList(cfg) {
let list = {
"groupSize": "Use Group Size",
"timeout": "Use Timeout",
"groupSize&timeout": "Use Group Size and Timeout"
};
return list;
}

async function getMetaModel(cfg) {
let meta = '';
if(cfg.mode === "groupSize"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"groupSize": {
"type": "number",
"required": true,
"title": "Number of messages produced by splitter",
"order": 3
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 2
}
}
},
"out": {
"type": "object"
}
};
}
else if(cfg.mode === "timeout"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"timersec": {
"type": "number",
"required": true,
"help":{
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
},
"title": "Delay timer(in ms)",
"order": 3
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 2
}
}
},
"out": {
"type": "object"
}
};
}
else if(cfg.mode === "groupSize&timeout"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"groupSize": {
"type": "number",
"required": false,
"title": "Number of messages produced by splitter",
"order": 3
},
"timersec": {
"type": "number",
"required": false,
"help":{
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
},
"title": "Delay timer(in ms)",
"order": 2
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 1
}
}
},
"out": {
"type": "object"
}
};
}
return meta;
}
module.exports = {
process: processAction,
getMetaModel: getMetaModel,
dynList: dynList
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
"devDependencies": {
"@elastic.io/component-logger": "0.0.1",
"chai": "^4.2.0",
"chai": "4.2.0",
"chai-as-promised": "7.1.1",
"eslint": "7.6.0",
"eslint-config-airbnb-base": "14.2.0",
Expand Down
10 changes: 5 additions & 5 deletions spec/reassemble.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('Split on JSONata ', () => {
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'groupSize'});
// eslint-disable-next-line no-unused-expressions
expect(self.emit.calledOnce).to.be.true;
expect(self.emit.lastCall.args[1].body).to.deep.equal({
Expand Down Expand Up @@ -88,7 +88,7 @@ describe('Split on JSONata ', () => {
},
};

await expect(reassemble.process.call(self, msg, {})).to.eventually.be.rejectedWith('Size must be a positive integer.');
await expect(reassemble.process.call(self, msg, {mode: 'groupSize'})).to.eventually.be.rejectedWith('Size must be a positive integer.');
});

it('Interleaved Case with duplicate deliveries', async () => {
Expand Down Expand Up @@ -152,7 +152,7 @@ describe('Split on JSONata ', () => {
nock('https://ma.estr').delete('/objects/2').reply(200, {});

// eslint-disable-next-line no-await-in-loop
await reassemble.process.call(self, { body: msgBodies[i] }, {});
await reassemble.process.call(self, { body: msgBodies[i] }, {mode: 'groupSize'});
// eslint-disable-next-line default-case
switch (i) {
case i <= 3:
Expand Down Expand Up @@ -214,7 +214,7 @@ describe('Split on JSONata ', () => {
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'groupSize'});
// eslint-disable-next-line no-unused-expressions
expect(self.emit.calledOnce).to.be.true;
expect(self.emit.lastCall.args[1].body).to.deep.equal({
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('Split on JSONata ', () => {

const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'timeout'});

// timersec + 0,5 second
await sleep(1500);
Expand Down