Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = {
'extends': 'airbnb-base',
'env': {
'mocha': true
}
};
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.0.3 (February 17, 2020)

* Fix passthrough bug
* Fix codestyle

## 1.0.2 (December 24, 2019)

* Update sailor version to 2.5.4
Expand Down
28 changes: 15 additions & 13 deletions lib/actions/transform.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/* eslint no-invalid-this: 0 no-console: 0 */

const eioUtils = require('elasticio-node').messages;
const { messages } = require('elasticio-node');
const jsonata = require('@elastic.io/jsonata-moment');

const PASSTHROUGH_BODY_PROPERTY = 'elasticio';
Expand All @@ -12,36 +10,40 @@ const PASSTHROUGH_BODY_PROPERTY = 'elasticio';
* @param cfg configuration that is account information and configuration field values
*/
async function processAction(msg, cfg) {
const expression = cfg.expression;
const { expression } = cfg;
const compiledExpression = jsonata(expression);
handlePassthrough(msg);
this.logger.info('Evaluating expression="%s" on body=%j', expression, msg.body);
const result = compiledExpression.evaluate(msg.body);
// eslint-disable-next-line no-use-before-define
const messageCopy = handlePassthrough(msg);
this.logger.info('Evaluating expression="%s" on body=%j', expression, messageCopy.body);
const result = compiledExpression.evaluate(messageCopy.body);
this.logger.info('Evaluation completed, result=%j', result);
if (result === undefined || result === null || Object.keys(result).length === 0) {
return Promise.resolve();
}
if (typeof result[Symbol.iterator] === 'function') {
// We have an iterator as result
// We have an iterator as result
// eslint-disable-next-line no-restricted-syntax
for (const item of result) {
await this.emit('data', eioUtils.newMessageWithBody(item));
// eslint-disable-next-line no-await-in-loop
await this.emit('data', messages.newMessageWithBody(item));
}
return Promise.resolve();
}
return Promise.resolve(eioUtils.newMessageWithBody(result));
return Promise.resolve(messages.newMessageWithBody(result));
}

function handlePassthrough(message) {
const messageCopy = JSON.parse(JSON.stringify(message));
if (message.passthrough && Object.keys(message.passthrough)) {
if (PASSTHROUGH_BODY_PROPERTY in message.body) {
throw new Error(`${PASSTHROUGH_BODY_PROPERTY} property is reserved \
if you are using passthrough functionality`);
}

message.body.elasticio = {};
Object.assign(message.body.elasticio, message.passthrough);
messageCopy.body.elasticio = {};
Object.assign(messageCopy.body.elasticio, message.passthrough);
}
return message;
return messageCopy;
}

module.exports.process = processAction;
Loading