Skip to content

Commit 1b2f29c

Browse files
committed
changes in readme file
1 parent 00a71be commit 1b2f29c

File tree

5 files changed

+222
-53
lines changed

5 files changed

+222
-53
lines changed

CHANGELOG.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
## 1.2.2 (August 25, 2021)
2-
* Added delay timer property in re-assemble action.
3-
4-
## 1.2.1 (July 23, 2021)
1+
## 1.3.1 (July 23, 2021)
52
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
63

74
## 1.2.0 (July 9, 2021)

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
9292
### Re-assemble Messages
9393

9494
Inverse of the split action: Given a stream of incoming messages a sum message is generated.
95-
Has 3 different behaviour variants:
96-
* Only specify group size and no delay timer. A message is emitted once the group size is reached for the given group. Should less message then the given group size arrive, then the group is silently discarded.
97-
* Only specify delay timer and no group size. All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
98-
* Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
95+
Has 3 different behaviour variants(options):
96+
* Use Group Size. A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group is silently discarded.
97+
* Use Timeout. All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
98+
* Use Group Size and Timeout: Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
9999

100100
Supported:
101101
* Messages can be re-ordered in the flow
@@ -107,7 +107,7 @@ Limitations:
107107
* All groups must have one or more messages. (i.e. No groups of size 0).
108108
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
109109
* All messages must arrive within the same container lifetime.
110-
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
110+
If all the messages in the group do not arrive, then the group will be silently discarded.
111111
* The group is dropped if there are any unexpected restarts to the container.
112112
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
113113
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.

lib/actions/reassemble.js

Lines changed: 210 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ async function timer(this_) {
1212
}
1313
}
1414

15-
async function processAction(msg) {
15+
async function processAction(msg,cfg) {
16+
const mode = cfg.mode
1617
const storage = new ObjectStorageWrapperExtended(this);
1718
const {
1819
groupSize,
@@ -30,9 +31,16 @@ async function processAction(msg) {
3031
if (!messageData) {
3132
incomingData[messageId] = undefined;
3233
}
33-
if (groupSize <= 0) {
34-
throw new Error('Size must be a positive integer.');
35-
}
34+
if(mode == 'groupSize'){
35+
if (groupSize <= 0) {
36+
throw new Error('Size must be a positive integer.');
37+
}
38+
};
39+
if(mode == 'timeout'){
40+
if (msg.body.timersec <= 0) {
41+
throw new Error('Delay timer must be a positive integer.');
42+
}
43+
};
3644
const {
3745
messageGroup,
3846
messageGroupId,
@@ -64,45 +72,25 @@ async function processAction(msg) {
6472
);
6573

6674
// when group sized is defined || when both group size and delay timer are defined
67-
if(messageGroupSize !== '' && messageGroupSize !== undefined){
68-
if(groupList.includes(groupId)){
69-
parsedMessageGroup.messages.forEach((message) => {
70-
incomingData[message.messageId] = message.messageData;
71-
});
72-
for(var key in groupElements){
73-
if(groupElements[key].groupId === groupId){
74-
groupElements[key].groupSize == messagesNumberSeen
75-
groupElements[key].messageData == incomingData
76-
}
77-
}
78-
}
79-
else{
80-
parsedMessageGroup.messages.forEach((message) => {
81-
incomingData[message.messageId] = message.messageData;
82-
});
83-
groupList.push(groupId)
84-
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
85-
}
75+
if(mode == 'groupSize'){
8676
if (messagesNumberSeen >= messageGroupSize) {
8777
parsedMessageGroup.messages.forEach((message) => {
8878
incomingData[message.messageId] = message.messageData;
8979
});
90-
9180
await this.emit('data', messages.newMessageWithBody({
9281
groupSize,
9382
groupId,
9483
messageData: incomingData,
9584
}));
9685
await storage.deleteObjectById(messageGroupId);
9786
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
98-
groupList = groupList.filter(def => def != groupId);
99-
groupElements = groupElements.filter(def => def.groupId != groupId);
10087
}
10188
}
102-
// When delay timer is defined and no group size defined
103-
if(messageGroupSize === undefined && msg.body.timersec !== undefined){
10489

105-
// in case no delay was defined
90+
// When delay timer option is selected
91+
if(mode == 'timeout'){
92+
93+
// delay timer
10694
var delay = msg.body.timersec
10795
if(delay >= 40000){delay == 40000}
10896
clearTimeout(timeHandle);
@@ -112,12 +100,11 @@ async function processAction(msg) {
112100
parsedMessageGroup.messages.forEach((message) => {
113101
incomingData[message.messageId] = message.messageData;
114102
});
115-
for(var key in groupElements){
116-
if(groupElements[key].groupId === groupId){
117-
groupElements[key].groupSize == messagesNumberSeen
118-
groupElements[key].messageData == incomingData
119-
}
120-
}
103+
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
104+
currentGroup[0].groupSize = messagesNumberSeen
105+
currentGroup[0].messageData = incomingData
106+
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
107+
groupElements.push(currentGroup[0]);
121108
}
122109
else{
123110
parsedMessageGroup.messages.forEach((message) => {
@@ -127,9 +114,194 @@ async function processAction(msg) {
127114
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
128115
}
129116
}
130-
if(messageGroupSize === undefined && msg.body.timersec === undefined){
131-
throw new Error('Either group size or delay timer need to be defined');
117+
118+
// When both groupSize and delay timer option is selected
119+
if(mode == 'groupSize&timeout'){
120+
121+
var delay = msg.body.timersec
122+
if(delay >= 40000){delay == 40000}
123+
clearTimeout(timeHandle);
124+
timeHandle = setTimeout(timer, delay, this);
125+
126+
if(groupList.includes(groupId)){
127+
parsedMessageGroup.messages.forEach((message) => {
128+
incomingData[message.messageId] = message.messageData;
129+
});
130+
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
131+
currentGroup[0].groupSize = messagesNumberSeen
132+
currentGroup[0].messageData = incomingData
133+
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
134+
groupElements.push(currentGroup[0]);
135+
}
136+
else{
137+
parsedMessageGroup.messages.forEach((message) => {
138+
incomingData[message.messageId] = message.messageData;
139+
});
140+
groupList.push(groupId)
141+
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
142+
}
143+
if (messagesNumberSeen >= messageGroupSize) {
144+
parsedMessageGroup.messages.forEach((message) => {
145+
incomingData[message.messageId] = message.messageData;
146+
});
147+
148+
await this.emit('data', messages.newMessageWithBody({
149+
groupSize,
150+
groupId,
151+
messageData: incomingData,
152+
}));
153+
await storage.deleteObjectById(messageGroupId);
154+
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
155+
groupList = groupList.filter(def => def != groupId);
156+
groupElements = groupElements.filter(def => def.groupId != groupId);
157+
}
132158
}
133159
}
134160

135-
exports.process = processAction;
161+
//----------------------------------------------------------------------------------------------------------------------------
162+
//Dynamic drop-down logic starts hier
163+
164+
async function dynList(cfg) {
165+
let list = {
166+
"groupSize": "Use Group Size",
167+
"timeout": "Use Timeout",
168+
"groupSize&timeout": "Use Group Size and Timeout"
169+
};
170+
return list;
171+
}
172+
173+
async function getMetaModel(cfg) {
174+
let meta = '';
175+
if(cfg.mode === "groupSize"){
176+
meta = {
177+
"in": {
178+
"type": "object",
179+
"required": true,
180+
"properties": {
181+
"groupId": {
182+
"type": "string",
183+
"required": true,
184+
"title": "Unique ID to describe the group",
185+
"order": 5
186+
},
187+
"messageId": {
188+
"type": "string",
189+
"required": true,
190+
"title": "Unique ID to describe this message",
191+
"order": 4
192+
},
193+
"groupSize": {
194+
"type": "number",
195+
"required": true,
196+
"title": "Number of messages produced by splitter",
197+
"order": 3
198+
},
199+
"messageData": {
200+
"title": "Message Data",
201+
"required": false,
202+
"type": "object",
203+
"properties": {},
204+
"order": 2
205+
}
206+
}
207+
},
208+
"out": {
209+
"type": "object"
210+
}
211+
};
212+
}
213+
else if(cfg.mode === "timeout"){
214+
meta = {
215+
"in": {
216+
"type": "object",
217+
"required": true,
218+
"properties": {
219+
"groupId": {
220+
"type": "string",
221+
"required": true,
222+
"title": "Unique ID to describe the group",
223+
"order": 5
224+
},
225+
"messageId": {
226+
"type": "string",
227+
"required": true,
228+
"title": "Unique ID to describe this message",
229+
"order": 4
230+
},
231+
"timersec": {
232+
"type": "number",
233+
"required": true,
234+
"help":{
235+
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
236+
},
237+
"title": "Delay timer(in ms)",
238+
"order": 3
239+
},
240+
"messageData": {
241+
"title": "Message Data",
242+
"required": false,
243+
"type": "object",
244+
"properties": {},
245+
"order": 2
246+
}
247+
}
248+
},
249+
"out": {
250+
"type": "object"
251+
}
252+
};
253+
}
254+
else if(cfg.mode === "groupSize&timeout"){
255+
meta = {
256+
"in": {
257+
"type": "object",
258+
"required": true,
259+
"properties": {
260+
"groupId": {
261+
"type": "string",
262+
"required": true,
263+
"title": "Unique ID to describe the group",
264+
"order": 5
265+
},
266+
"messageId": {
267+
"type": "string",
268+
"required": true,
269+
"title": "Unique ID to describe this message",
270+
"order": 4
271+
},
272+
"groupSize": {
273+
"type": "number",
274+
"required": false,
275+
"title": "Number of messages produced by splitter",
276+
"order": 3
277+
},
278+
"timersec": {
279+
"type": "number",
280+
"required": false,
281+
"help":{
282+
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
283+
},
284+
"title": "Delay timer(in ms)",
285+
"order": 2
286+
},
287+
"messageData": {
288+
"title": "Message Data",
289+
"required": false,
290+
"type": "object",
291+
"properties": {},
292+
"order": 1
293+
}
294+
}
295+
},
296+
"out": {
297+
"type": "object"
298+
}
299+
};
300+
}
301+
return meta;
302+
}
303+
module.exports = {
304+
process: processAction,
305+
getMetaModel: getMetaModel,
306+
dynList: dynList
307+
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
},
2323
"devDependencies": {
2424
"@elastic.io/component-logger": "0.0.1",
25-
"chai": "^4.2.0",
25+
"chai": "4.2.0",
2626
"chai-as-promised": "7.1.1",
2727
"eslint": "7.6.0",
2828
"eslint-config-airbnb-base": "14.2.0",

spec/reassemble.spec.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ describe('Split on JSONata ', () => {
5858
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
5959
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});
6060

61-
await reassemble.process.call(self, msg, {});
61+
await reassemble.process.call(self, msg, {mode: 'groupSize'});
6262
// eslint-disable-next-line no-unused-expressions
6363
expect(self.emit.calledOnce).to.be.true;
6464
expect(self.emit.lastCall.args[1].body).to.deep.equal({
@@ -88,7 +88,7 @@ describe('Split on JSONata ', () => {
8888
},
8989
};
9090

91-
await expect(reassemble.process.call(self, msg, {})).to.eventually.be.rejectedWith('Size must be a positive integer.');
91+
await expect(reassemble.process.call(self, msg, {mode: 'groupSize'})).to.eventually.be.rejectedWith('Size must be a positive integer.');
9292
});
9393

9494
it('Interleaved Case with duplicate deliveries', async () => {
@@ -152,7 +152,7 @@ describe('Split on JSONata ', () => {
152152
nock('https://ma.estr').delete('/objects/2').reply(200, {});
153153

154154
// eslint-disable-next-line no-await-in-loop
155-
await reassemble.process.call(self, { body: msgBodies[i] }, {});
155+
await reassemble.process.call(self, { body: msgBodies[i] }, {mode: 'groupSize'});
156156
// eslint-disable-next-line default-case
157157
switch (i) {
158158
case i <= 3:
@@ -214,7 +214,7 @@ describe('Split on JSONata ', () => {
214214
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
215215
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});
216216

217-
await reassemble.process.call(self, msg, {});
217+
await reassemble.process.call(self, msg, {mode: 'groupSize'});
218218
// eslint-disable-next-line no-unused-expressions
219219
expect(self.emit.calledOnce).to.be.true;
220220
expect(self.emit.lastCall.args[1].body).to.deep.equal({
@@ -262,7 +262,7 @@ describe('Split on JSONata ', () => {
262262

263263
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
264264

265-
await reassemble.process.call(self, msg, {});
265+
await reassemble.process.call(self, msg, {mode: 'timeout'});
266266

267267
// timersec + 0,5 second
268268
await sleep(1500);

0 commit comments

Comments
 (0)