Skip to content

Commit 24bf657

Browse files
authored
Added maester-library to re-assembling messages action (#70)
Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
1 parent c358bfe commit 24bf657

File tree

7 files changed

+687
-60
lines changed

7 files changed

+687
-60
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
## 1.2.1 (July 23, 2021)
2+
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
3+
14
## 1.2.0 (July 9, 2021)
2-
* Add Ability to pass data from the individual messages to re-assembled message
5+
* Add Ability to pass data from the individual messages to `Re-assembled message` action
36

47
## 1.1.9 (February 12, 2021)
58
* Update sailor version to 2.6.24

component.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"title": "Splitter",
3-
"version": "1.2.0",
3+
"version": "1.2.1",
44
"description": "Splits a message into multiple messages.",
55
"buildType":"docker",
66
"actions": {

lib/actions/reassemble.js

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,71 @@
1+
// eslint-disable-next-line
12
const { messages } = require('elasticio-node');
2-
3-
const groupsSeen = {};
3+
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');
44

55
async function processAction(msg) {
6+
const storage = new ObjectStorageWrapperExtended(this);
67
const {
78
groupSize,
89
groupId,
910
messageId,
1011
messageData,
1112
} = msg.body;
13+
const incomingData = {};
14+
const object = {
15+
messageId,
16+
groupId,
17+
messageData,
18+
};
1219

1320
if (groupSize <= 0) {
1421
throw new Error('Size must be a positive integer.');
1522
}
23+
if (!messageData) {
24+
incomingData[messageId] = undefined;
25+
}
1626

17-
if (!groupsSeen[groupId]) {
18-
groupsSeen[groupId] = {
19-
groupSize,
20-
messageIdsSeen: new Set(),
21-
incomingData: {},
22-
};
27+
const {
28+
messageGroup,
29+
messageGroupId,
30+
messageGroupSize,
31+
isCreated,
32+
} = await storage.createMessageGroupIfNotExists(groupId, groupSize);
33+
34+
if (isCreated) {
35+
await storage.createNewObjectInMessageGroup(object, messageGroupId);
36+
this.logger.info('New Group created. Added message');
37+
}
38+
if (!isCreated) {
39+
await storage.createNewObjectInMessageGroup(object, messageGroupId);
40+
this.logger.info('Existed Group found. Added message');
41+
this.logger.info(`Saved messages: ${Object.keys(messageGroup.messageIdsSeen).join(', ')}`);
2342
}
2443

25-
groupsSeen[groupId].messageIdsSeen.add(messageId);
26-
groupsSeen[groupId].incomingData[messageId] = messageData;
27-
const numberSeen = groupsSeen[groupId].messageIdsSeen.size;
44+
const parsedMessageGroup = await storage.lookupParsedObjectById(messageGroupId);
45+
const filteredMessages = parsedMessageGroup.messages
46+
.filter((message) => message.messageId !== messageId);
47+
filteredMessages.push(object);
48+
parsedMessageGroup.messages = filteredMessages;
49+
await storage.updateObject(messageGroupId, parsedMessageGroup);
50+
const messagesNumberSeen = Object.keys(parsedMessageGroup.messageIdsSeen).length;
2851

2952
this.logger.info(
30-
`Saw message ${messageId} of group ${groupId} Currently the group has ${numberSeen} of ${groupSize} message(s).`,
53+
`Saw message ${messageId} of group ${groupId}.
54+
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
3155
);
3256

33-
if (numberSeen >= groupSize) {
57+
if (messagesNumberSeen >= messageGroupSize) {
58+
parsedMessageGroup.messages.forEach((message) => {
59+
incomingData[message.messageId] = message.messageData;
60+
});
61+
3462
await this.emit('data', messages.newMessageWithBody({
3563
groupSize,
3664
groupId,
37-
messageData: groupsSeen[groupId].incomingData,
65+
messageData: incomingData,
3866
}));
39-
delete groupsSeen[groupId];
67+
await storage.deleteObjectById(messageGroupId);
68+
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
4069
}
4170
}
4271

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
const { ObjectStorageWrapper } = require('@elastic.io/maester-client/dist/ObjectStorageWrapper');
2+
3+
class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
4+
constructor(context) {
5+
super(context);
6+
this.logger = context.logger;
7+
this.EXTERNAL_ID_QUERY_HEADER_NAME = 'externalid';
8+
this.TTL_TWO_DAYS = 172800;
9+
}
10+
11+
async lookupParsedObjectById(messageGroupId) {
12+
const messageGroup = await this.lookupObjectById(messageGroupId);
13+
return JSON.parse(messageGroup);
14+
}
15+
16+
async createMessageGroupIfNotExists(externalId, messageGroupSize) {
17+
this.logger.info('Processing creation of the new message group');
18+
const messageGroups = await this.lookupObjectsByQueryParameters(
19+
[{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
20+
);
21+
if (messageGroups.length > 1) {
22+
throw new Error('Several message groups with the same ids can not exist');
23+
}
24+
if (!messageGroups.length) {
25+
this.logger.info('No message groups found');
26+
const newMessageGroup = {
27+
messages: [],
28+
messageIdsSeen: {},
29+
};
30+
const { objectId: messageGroupId } = await this.createObject(
31+
newMessageGroup, [{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
32+
[], this.TTL_TWO_DAYS,
33+
);
34+
this.logger.info('Created new message group');
35+
return {
36+
messageGroup: newMessageGroup, messageGroupSize, messageGroupId, isCreated: true,
37+
};
38+
}
39+
this.logger.info('MessageGroup found');
40+
const messageGroupId = messageGroups[0].objectId;
41+
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
42+
return {
43+
messageGroup: parsedMessageGroup, messageGroupSize, messageGroupId, isCreated: false,
44+
};
45+
}
46+
47+
async createNewObjectInMessageGroup(object, messageGroupId) {
48+
this.logger.info('Processing creation of the new object');
49+
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
50+
this.logger.info('...Updating message group');
51+
parsedMessageGroup.messageIdsSeen[object.messageId] = object.messageId;
52+
return this.updateObject(messageGroupId, {
53+
...parsedMessageGroup, messages: [...parsedMessageGroup.messages, object],
54+
});
55+
}
56+
}
57+
58+
module.exports = ObjectStorageWrapperExtended;

0 commit comments

Comments
 (0)