Skip to content

Commit d5edc49

Browse files
authored
Add Re-assemble (#42)
* Apply AirBnB Style Rules. * 1.1.0 * Update dependencies.
1 parent d3de46c commit d5edc49

File tree

14 files changed

+1006
-759
lines changed

14 files changed

+1006
-759
lines changed

.eslintrc.js

Lines changed: 6 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -1,159 +1,7 @@
1-
'use strict';
2-
3-
const ERROR = 'error';
4-
const WARN = 'warn';
5-
const ALWAYS = 'always';
6-
const NEVER = 'never';
7-
81
module.exports = {
9-
'env': {
10-
es6: true,
11-
node: true,
12-
jasmine: true,
13-
mocha: true
14-
},
15-
"parserOptions": {
16-
"ecmaVersion": 8
17-
},
18-
'extends': 'airbnb-base',
19-
'rules': {
20-
'indent': [
21-
ERROR,
22-
4,
23-
{
24-
SwitchCase: 1
25-
}
26-
],
27-
'linebreak-style': ERROR,
28-
'quotes': [
29-
WARN,
30-
'single'
31-
],
32-
'semi': [
33-
ERROR,
34-
ALWAYS
35-
],
36-
'func-names': ERROR,
37-
'no-empty': ERROR,
38-
'no-empty-function': ERROR,
39-
'brace-style': [
40-
ERROR,
41-
'1tbs',
42-
{ allowSingleLine: true }
43-
],
44-
'no-multiple-empty-lines': ERROR,
45-
'no-multi-spaces': ERROR,
46-
'one-var': [
47-
ERROR,
48-
NEVER
49-
],
50-
'quote-props': [
51-
WARN,
52-
'consistent-as-needed'
53-
],
54-
'key-spacing': ERROR,
55-
'space-unary-ops': [
56-
ERROR,
57-
{
58-
words: true,
59-
nonwords: false
60-
}
61-
],
62-
'no-spaced-func': ERROR,
63-
'space-before-function-paren': [
64-
ERROR,
65-
{
66-
anonymous: ALWAYS,
67-
named: NEVER
68-
}
69-
],
70-
'arrow-body-style': [
71-
WARN,
72-
'as-needed'
73-
],
74-
'array-bracket-spacing': ERROR,
75-
'space-in-parens': ERROR,
76-
'comma-dangle': ERROR,
77-
'no-trailing-spaces': ERROR,
78-
'yoda': ERROR,
79-
'max-len': [
80-
ERROR,
81-
120
82-
],
83-
'camelcase': [
84-
ERROR,
85-
{
86-
properties: 'never'
87-
}
88-
],
89-
'new-cap': [
90-
WARN,
91-
{
92-
capIsNewExceptions: ['Q']
93-
}
94-
],
95-
'comma-style': ERROR,
96-
'curly': ERROR,
97-
'object-curly-spacing': [
98-
WARN,
99-
ALWAYS
100-
],
101-
'object-curly-newline': [
102-
ERROR,
103-
{
104-
ObjectExpression: {
105-
minProperties: 1
106-
},
107-
ObjectPattern: {
108-
minProperties: 5
109-
}
110-
}
111-
],
112-
'object-property-newline': ERROR,
113-
'template-curly-spacing': ERROR,
114-
'dot-notation': ERROR,
115-
'dot-location': [
116-
ERROR,
117-
'property'
118-
],
119-
'func-style': [
120-
ERROR,
121-
'declaration',
122-
{
123-
allowArrowFunctions: true
124-
}
125-
],
126-
'eol-last': ERROR,
127-
'space-infix-ops': ERROR,
128-
'keyword-spacing': ERROR,
129-
'space-before-blocks': ERROR,
130-
'no-invalid-this': ERROR,
131-
'consistent-this': ERROR,
132-
'no-this-before-super': ERROR,
133-
'no-unreachable': ERROR,
134-
'no-sparse-arrays': ERROR,
135-
'array-callback-return': ERROR,
136-
'strict': [
137-
WARN,
138-
'global'
139-
],
140-
'eqeqeq': ERROR,
141-
'no-use-before-define': WARN,
142-
'no-undef': ERROR,
143-
'no-unused-vars': WARN,
144-
'no-mixed-spaces-and-tabs': ERROR,
145-
'operator-linebreak': [
146-
ERROR,
147-
'before'
148-
],
149-
'no-console': [
150-
WARN,
151-
{
152-
'allow': [
153-
'warn',
154-
'error'
155-
]
156-
}
157-
]
158-
}
159-
};
2+
'extends': 'airbnb-base',
3+
'env': {
4+
'mocha': true,
5+
'node': true,
6+
}
7+
};

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.1.0 (May 7, 2020)
2+
* Add re-assemble action
3+
* Update dependencies
4+
15
## 1.0.6 (March 26, 2020)
26
* Add support for new getFlowVariables and handlePathrough jsonata functions
37

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,37 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
9494
#### List of Expected Config fields
9595
```Split Property``` - use this field to choose a separator.
9696

97+
### Re-assemble Messages
98+
**(Beta)**
99+
100+
Inverse of the split action: Given a stream of incoming messages that which have
101+
been split apart by a split action (or similar), produce one message once all
102+
message parts have arrived.
103+
104+
Supported:
105+
* Messages can be re-ordered in the flow
106+
* If messages are re-delivered as a result of the platform's at once delivery guarantee, does not trigger false positives
107+
* Messages from one original message can be interleaved with messages from another original message
108+
(e.g. Two overlapping webhook calls arrive and the flow has components where parallel processing > 1.)
109+
110+
Limitations:
111+
* All groups must have one or more messages. (i.e. No groups of size 0).
112+
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported)
113+
* If all messages in a group fail to arrive at the re-assemble action (because one message suffered an error earlier in the flow)
114+
then this component will silently discard the group.
115+
* All messages must arrive within the same container lifetime.
116+
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
117+
* The group is dropped if there are any unexpected restarts to the container.
118+
* Size of the group must be known by all group members.
119+
* Messages are only emitter when all parts arrive. Emitting a message only when the first part arrives isn't supported.
120+
* The contents of data that are picked up by the sub-messages aren't passed forward to future steps.
121+
122+
#### List of Expected Config fields
123+
```groupSize``` - Number of messages in the group
124+
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.
125+
```messageId``` - Id for a message to distinguish it from other messages in the group.
126+
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
127+
97128
## Known limitations (common for the component)
98129
No.
99130

component.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,38 @@
2727
"viewClass": "JSONataView"
2828
}
2929
}
30+
},
31+
"reassemble": {
32+
"title": "Re-assemble Messages",
33+
"help": {
34+
"description": "Inverse operation from split: Re-assemble a group of messages that have previously been split",
35+
"link": "#re-assemble-messages"
36+
},
37+
"main": "./lib/actions/reassemble.js",
38+
"fields": {},
39+
"metadata": {
40+
"in": {
41+
"type": "object",
42+
"required": true,
43+
"properties": {
44+
"groupSize": {
45+
"type": "number",
46+
"required": true,
47+
"title": "Number of messages produced by splitter"
48+
},
49+
"groupId": {
50+
"type": "string",
51+
"required": true,
52+
"title": "Unique ID to describe the group"
53+
},
54+
"messageId": {
55+
"type": "string",
56+
"required": true,
57+
"title": "Unique ID to describe this message"
58+
}
59+
}
60+
}
61+
}
3062
}
3163
}
3264
}

lib/actions/reassemble.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
const { messages } = require('elasticio-node');
2+
3+
const groupsSeen = {};
4+
5+
async function processAction(msg) {
6+
const {
7+
groupSize,
8+
groupId,
9+
messageId,
10+
} = msg.body;
11+
12+
if (groupSize <= 0) {
13+
throw new Error('Size must be a positive integer.');
14+
}
15+
16+
if (!groupsSeen[groupId]) {
17+
groupsSeen[groupId] = {
18+
groupSize,
19+
messageIdsSeen: new Set(),
20+
};
21+
}
22+
23+
groupsSeen[groupId].messageIdsSeen.add(messageId);
24+
const numberSeen = groupsSeen[groupId].messageIdsSeen.size;
25+
26+
this.logger.info(
27+
`Saw message ${messageId} of group ${groupId} Currently the group has ${numberSeen} of ${groupSize} message(s).`,
28+
);
29+
30+
if (numberSeen >= groupSize) {
31+
await this.emit('data', messages.newMessageWithBody({
32+
groupSize,
33+
groupId,
34+
}));
35+
delete groupsSeen[groupId];
36+
}
37+
}
38+
39+
exports.process = processAction;

lib/actions/splitOnJsonata.js

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,26 @@ const { messages } = require('elasticio-node');
66
* @this processAction
77
*/
88
async function processAction(msg, cfg) {
9-
const split = JsonataTransform.jsonataTransform(msg, cfg, this);
9+
const split = JsonataTransform.jsonataTransform(msg, cfg, this);
1010

11-
if (!Array.isArray(split)) {
12-
await this.emit('error', new Error('The evaluated expression must be an array.'));
13-
return;
14-
}
11+
if (!Array.isArray(split)) {
12+
await this.emit('error', new Error('The evaluated expression must be an array.'));
13+
return;
14+
}
1515

16-
if (_.find(split, (elem) => !_.isObject(elem))) {
17-
await this.emit('error', new Error('Splitting arrays of objects only!'));
18-
return;
19-
}
16+
if (_.find(split, (elem) => !_.isObject(elem))) {
17+
await this.emit('error', new Error('Splitting arrays of objects only!'));
18+
return;
19+
}
2020

21-
this.logger.info('Splitting the incoming message into %s messages', split.length);
21+
this.logger.info('Splitting the incoming message into %s messages', split.length);
2222

23-
for (let i = 0; i < split.length; i += 1) {
24-
if (split[i]) {
25-
// eslint-disable-next-line no-await-in-loop
26-
await this.emit('data', messages.newMessageWithBody(split[i]));
27-
}
23+
for (let i = 0; i < split.length; i += 1) {
24+
if (split[i]) {
25+
// eslint-disable-next-line no-await-in-loop
26+
await this.emit('data', messages.newMessageWithBody(split[i]));
2827
}
28+
}
2929
}
3030

3131
exports.process = processAction;

0 commit comments

Comments
 (0)