Skip to content

Commit 8093c2e

Browse files
Reduce max delay to 20sec (#91)
1 parent f5f3461 commit 8093c2e

File tree

9 files changed

+5821
-635
lines changed

9 files changed

+5821
-635
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ commands:
7272
jobs:
7373
test:
7474
docker:
75-
- image: circleci/node:14-stretch
75+
- image: circleci/node:16-stretch
7676
steps:
7777
- checkout
7878
- node/install:
7979
node-version: << pipeline.parameters.node-version >>
8080
- run:
8181
name: Audit Dependencies
82-
command: npm audit --audit-level=high
82+
command: npm audit --production --audit-level=high
8383
- node/install-packages:
8484
cache-path: ./node_modules
8585
override-ci-command: npm install
@@ -88,7 +88,7 @@ jobs:
8888
command: npm test
8989
build:
9090
docker:
91-
- image: circleci/node:14-stretch
91+
- image: circleci/node:16-stretch
9292
user: root
9393
steps:
9494
- checkout

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 1.4.2 (October 07, 2022)
2+
* Update Sailor version to 2.7.0
3+
* Update @elastic.io/maester-client version to 4.0.3
4+
* Get rid of vulnerabilities in dependencies
5+
* Fix message processing
6+
* Maximum `Delay timer` in `Re-assembled message` action reduced to 20 sec
7+
18
## 1.4.1 (April 08, 2022)
29
* Update Sailor version to 2.6.27
310
* Get rid of vulnerabilities in dependencies

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ If all the messages in the group do not arrive, then the group will not be emitt
114114
* The group is dropped if there are any unexpected restarts to the container.
115115
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
116116
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
117-
* The delay timer can not exceed 40,000 milliseconds. If more than this maximum is given, then this maximum will be used instead.
117+
* The delay timer can not exceed 20,000 milliseconds. If more than this maximum is given, then this maximum will be used instead.
118118

119119
#### List of Expected Config fields
120120
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.

component.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"title": "Splitter",
3-
"version": "1.4.1",
43
"description": "Splits a message into multiple messages.",
4+
"version": "1.4.2",
55
"actions": {
66
"split": {
77
"deprecated": true,
@@ -57,4 +57,4 @@
5757
}
5858
}
5959
}
60-
}
60+
}

lib/actions/reassemble.js

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ async function timer(this_) {
1111
for (let i = 0; i < groupList.length; i += 1) {
1212
const storage = new ObjectStorageWrapperExtended(this_);
1313
// eslint-disable-next-line no-await-in-loop
14-
const results = await storage.lookupParsedObjectById(groupList[i]);
14+
const results = await storage.lookupObjectById(groupList[i]);
1515
const incomingData = {};
1616
results.messages.forEach((message) => {
1717
incomingData[message.messageId] = message.messageData;
@@ -37,6 +37,7 @@ async function processAction(msg, cfg) {
3737
groupId,
3838
messageId = uuidv4(),
3939
messageData,
40+
timersec = 20000,
4041
} = msg.body;
4142
const incomingData = {};
4243
const object = {
@@ -54,7 +55,7 @@ async function processAction(msg, cfg) {
5455
}
5556
}
5657
if (mode === 'timeout') {
57-
if (msg.body.timersec <= 0) {
58+
if (timersec <= 0) {
5859
throw new Error('Delay timer must be a positive integer.');
5960
}
6061
}
@@ -74,21 +75,19 @@ async function processAction(msg, cfg) {
7475
this.logger.info('Existed Group found. Added message');
7576
this.logger.info(`Saved messages: ${Object.keys(messageGroup.messageIdsSeen).join(', ')}`);
7677
}
77-
78-
const parsedMessageGroup = await storage.lookupParsedObjectById(messageGroupId);
78+
const parsedMessageGroup = await storage.lookupObjectById(messageGroupId);
7979
const filteredMessages = parsedMessageGroup.messages
8080
.filter((message) => message.messageId !== messageId);
8181
filteredMessages.push(object);
8282
parsedMessageGroup.messages = filteredMessages;
83-
await storage.updateObject(messageGroupId, parsedMessageGroup);
83+
await storage.updateObjectById(messageGroupId, parsedMessageGroup);
8484
const messagesNumberSeen = Object.keys(parsedMessageGroup.messageIdsSeen).length;
8585

8686
this.logger.info(
8787
`Saw message ${messageId} of group ${groupId}.
8888
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
8989
);
90-
91-
// when grouSized option is selected
90+
// when groupSized option is selected
9291
if (mode === 'groupSize') {
9392
if (messagesNumberSeen >= messageGroupSize) {
9493
parsedMessageGroup.messages.forEach((message) => {
@@ -106,7 +105,7 @@ async function processAction(msg, cfg) {
106105

107106
// When delay timer option is selected
108107
if (mode === 'timeout') {
109-
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
108+
delay = (timersec >= 20000) ? 20000 : timersec;
110109
clearTimeout(timeHandle);
111110
timeHandle = setTimeout(timer, delay, this);
112111
if (!groupList.includes(messageGroupId)) {
@@ -116,7 +115,7 @@ async function processAction(msg, cfg) {
116115

117116
// When both groupSize and delay timer option is selected
118117
if (mode === 'groupSize&timeout') {
119-
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
118+
delay = (timersec >= 20000) ? 20000 : timersec;
120119
clearTimeout(timeHandle);
121120
timeHandle = setTimeout(timer, delay, this);
122121

@@ -142,7 +141,7 @@ async function processAction(msg, cfg) {
142141
}
143142

144143
//-------------------------------------------------------------------------------------
145-
// Dynamic drop-down logic starts hier
144+
// Dynamic drop-down logic starts here
146145

147146
async function getMetaModel(cfg) {
148147
if (cfg.mode === 'groupSize') {

lib/actions/utils-wrapper/ObjectStorageWrapperExtended.js

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { ObjectStorageWrapper } = require('@elastic.io/maester-client/dist/ObjectStorageWrapper');
1+
const { ObjectStorageWrapper } = require('@elastic.io/maester-client');
22

33
class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
44
constructor(context) {
@@ -8,11 +8,6 @@ class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
88
this.TTL_TWO_DAYS = 172800;
99
}
1010

11-
async lookupParsedObjectById(messageGroupId) {
12-
const messageGroup = await this.lookupObjectById(messageGroupId);
13-
return JSON.parse(messageGroup);
14-
}
15-
1611
async createMessageGroupIfNotExists(externalId, messageGroupSize) {
1712
this.logger.info('Processing creation of the new message group');
1813
const messageGroups = await this.lookupObjectsByQueryParameters(
@@ -27,7 +22,7 @@ class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
2722
messages: [],
2823
messageIdsSeen: {},
2924
};
30-
const { objectId: messageGroupId } = await this.createObject(
25+
const messageGroupId = await this.createObject(
3126
newMessageGroup, [{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
3227
[], this.TTL_TWO_DAYS,
3328
);
@@ -38,19 +33,19 @@ class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
3833
}
3934
this.logger.info('MessageGroup found');
4035
const messageGroupId = messageGroups[0].objectId;
41-
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
36+
const messageGroup = await this.lookupObjectById(messageGroupId);
4237
return {
43-
messageGroup: parsedMessageGroup, messageGroupSize, messageGroupId, isCreated: false,
38+
messageGroup, messageGroupSize, messageGroupId, isCreated: false,
4439
};
4540
}
4641

4742
async createNewObjectInMessageGroup(object, messageGroupId) {
4843
this.logger.info('Processing creation of the new object');
49-
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
44+
const messageGroup = await this.lookupObjectById(messageGroupId);
5045
this.logger.info('...Updating message group');
51-
parsedMessageGroup.messageIdsSeen[object.messageId] = object.messageId;
52-
return this.updateObject(messageGroupId, {
53-
...parsedMessageGroup, messages: [...parsedMessageGroup.messages, object],
46+
messageGroup.messageIdsSeen[object.messageId] = object.messageId;
47+
return this.updateObjectById(messageGroupId, {
48+
...messageGroup, messages: [...messageGroup.messages, object],
5449
});
5550
}
5651
}

0 commit comments

Comments
 (0)