Skip to content

Commit 6413c8a

Browse files
committed
timer_property
1 parent 24bf657 commit 6413c8a

File tree

6 files changed

+125
-4507
lines changed

6 files changed

+125
-4507
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.2.2 (August 25, 2021)
2+
* Added delay timer property in re-assemble action.
3+
14
## 1.2.1 (July 23, 2021)
25
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
36

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,19 @@ If at any point there is more than a 15 second gap in messages, then the group w
112112
* The group is dropped if there are any unexpected restarts to the container.
113113
* Size of the group must be known by all group members.
114114
* Messages are only emitter when all parts arrive. Emitting a message only when the first part arrives isn't supported.
115+
*
115116

116117
#### List of Expected Config fields
117-
```groupSize``` - Number of messages in the group
118-
119118
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.
120119

121120
```messageId``` - Id for a message to distinguish it from other messages in the group.
122121
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
123122

124-
```messageData``` - object for providing some data derived from the steps between splitting and re-assembling
123+
```messageData``` - object for providing some data derived from the steps between splitting and re-assembling.
124+
125+
```groupSize``` - Number of messages in the group.
126+
127+
```Delay timer(in ms)``` - Time the process waits when no incoming messages before emiting(Max 40000 miliseconds)
125128

126129
## Known limitations (common for the component)
127130
No.

component.json

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,37 @@
5050
"properties": {
5151
"groupSize": {
5252
"type": "number",
53-
"required": true,
54-
"title": "Number of messages produced by splitter"
53+
"required": false,
54+
"title": "Number of messages produced by splitter",
55+
"order": 3
5556
},
5657
"groupId": {
5758
"type": "string",
5859
"required": true,
59-
"title": "Unique ID to describe the group"
60+
"title": "Unique ID to describe the group",
61+
"order": 5
6062
},
6163
"messageId": {
6264
"type": "string",
6365
"required": true,
64-
"title": "Unique ID to describe this message"
66+
"title": "Unique ID to describe this message",
67+
"order": 4
6568
},
6669
"messageData": {
6770
"title": "Message Data",
6871
"required": false,
6972
"type": "object",
70-
"properties": {}
73+
"properties": {},
74+
"order": 1
75+
},
76+
"timersec": {
77+
"type": "number",
78+
"required": false,
79+
"help":{
80+
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
81+
},
82+
"title": "Delay timer(in ms)",
83+
"order": 2
7184
}
7285
}
7386
}

lib/actions/reassemble.js

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22
const { messages } = require('elasticio-node');
33
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');
44

5+
let timeHandle;
6+
var groupList = [];
7+
var groupElements = [{groupSize: undefined, groupId: undefined, messageData:undefined}];
8+
9+
async function timer(this_) {
10+
for(var i = 1; i < groupElements.length; i++){
11+
await this_.emit('data', messages.newMessageWithBody( groupElements[i] ))
12+
}
13+
}
14+
515
async function processAction(msg) {
616
const storage = new ObjectStorageWrapperExtended(this);
717
const {
@@ -17,13 +27,18 @@ async function processAction(msg) {
1727
messageData,
1828
};
1929

20-
if (groupSize <= 0) {
21-
throw new Error('Size must be a positive integer.');
22-
}
30+
// in case no delay was defined
31+
var delay = msg.body.timersec ? msg.body.timersec : '40000'
32+
if(delay >= 40000){delay == 40000}
33+
clearTimeout(timeHandle);
34+
timeHandle = setTimeout(timer, delay, this);
35+
2336
if (!messageData) {
2437
incomingData[messageId] = undefined;
2538
}
26-
39+
if (groupSize <= 0) {
40+
throw new Error('Size must be a positive integer.');
41+
}
2742
const {
2843
messageGroup,
2944
messageGroupId,
@@ -53,19 +68,73 @@ async function processAction(msg) {
5368
`Saw message ${messageId} of group ${groupId}.
5469
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
5570
);
71+
if(messageGroupSize !== '' && messageGroupSize !== undefined ){
72+
clearTimeout(timeHandle);
73+
74+
if(groupList.includes(groupId)){
75+
parsedMessageGroup.messages.forEach((message) => {
76+
incomingData[message.messageId] = message.messageData;
77+
});
78+
79+
for(var key in groupElements){
80+
if(groupElements[key].groupId === groupId){
81+
groupElements[key].groupSize == messagesNumberSeen
82+
groupElements[key].messageData == incomingData
83+
}
84+
}
85+
}
86+
else{
87+
parsedMessageGroup.messages.forEach((message) => {
88+
incomingData[message.messageId] = message.messageData;
89+
});
5690

57-
if (messagesNumberSeen >= messageGroupSize) {
58-
parsedMessageGroup.messages.forEach((message) => {
59-
incomingData[message.messageId] = message.messageData;
60-
});
91+
groupList.push(groupId)
92+
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
93+
94+
}
95+
if (messagesNumberSeen >= messageGroupSize) {
96+
parsedMessageGroup.messages.forEach((message) => {
97+
incomingData[message.messageId] = message.messageData;
98+
});
99+
100+
await this.emit('data', messages.newMessageWithBody({
101+
groupSize,
102+
groupId,
103+
messageData: incomingData,
104+
}));
105+
await storage.deleteObjectById(messageGroupId);
106+
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
107+
groupList = groupList.filter(def => def != groupId);
108+
groupElements = groupElements.filter(def => def.groupId != groupId);
109+
}
110+
else{
111+
timeHandle = setTimeout(timer, delay, this);
112+
}
113+
}
114+
else{
115+
if(groupList.includes(groupId)){
116+
117+
parsedMessageGroup.messages.forEach((message) => {
118+
incomingData[message.messageId] = message.messageData;
119+
});
120+
121+
for(var key in groupElements){
122+
if(groupElements[key].groupId === groupId){
123+
groupElements[key].groupSize == messagesNumberSeen
124+
groupElements[key].messageData == incomingData
125+
}
126+
}
127+
}
128+
else{
129+
130+
parsedMessageGroup.messages.forEach((message) => {
131+
incomingData[message.messageId] = message.messageData;
132+
});
61133

62-
await this.emit('data', messages.newMessageWithBody({
63-
groupSize,
64-
groupId,
65-
messageData: incomingData,
66-
}));
67-
await storage.deleteObjectById(messageGroupId);
68-
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
134+
groupList.push(groupId)
135+
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
136+
137+
}
69138
}
70139
}
71140

0 commit comments

Comments
 (0)