Skip to content

Commit 7a67926

Browse files
ShkarupaNickstas-fomenko
authored andcommitted
Issue 180 component updates (#23)
* add read action; test view * add read action; test view; component.json hotfix * add read action; test view; component.json fix action name * add read action; test view; component.json fix read.js * add read action; test view; * add read action; test view; * add read action; test view; add jsonataview for url * add read action; test view; add username & password fields * add read action; add placeholders processing * add read action; add exports * add read action; change placeholders processing * add read action; add logs to placeholders processing * read action; mapping body to url placeholders * read action; mapping body to url placeholders * read action; mapping body to url placeholders * Going to eslint airbnb-base codestyle. * Fixinig unit test: - Changing input test data format from json to csv (as expected) - Setting correct output expected number. * add read action * docs draft * add response as attachment for read action * replace extra dependencies with utils functions * add header and required delimiter to output csv for read action * sailor upgraded to 2.2.3 migrated to v2 circleci added await for emit data * Issue 178 fixing oom (#25) * Fixing OOM - piping csv processing stream. * Adding limitations to docs * Update README.md * Update README.md (#27)
1 parent 39eceac commit 7a67926

File tree

6 files changed

+822
-431
lines changed

6 files changed

+822
-431
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ attachment. It can also write a CSV file from the incoming events.
1313

1414
## Environment variables
1515

16-
Component is not using any environment variables.
16+
Component does not has any required environment variables, but we suggest to use `EIO_REQUIRED_RAM_MB` - recommended value of allocated memory is `512` MB
1717

1818

1919
## Credentials
@@ -29,6 +29,11 @@ This trigger will fetch the CSV file from a given URL. The address must be acces
2929
to the component. The fetched CSV file will be placed in the attachment part of the
3030
outgoing message.
3131

32+
#### Limitations
33+
34+
You may get `Component run out of memory and terminated.` error during run-time, that means that component needs more memory, please add
35+
`EIO_REQUIRED_RAM_MB` Environment variables for csv-component in this case.
36+
3237
## Actions
3338

3439
### Read CSV attachment
@@ -61,6 +66,7 @@ for that cell. All other properties will be ignored. For example, headers
6166
```
6267

6368
will produce the following `.csv` file:
69+
6470
```
6571
foo,bar
6672
myfoo,mybar

circle.yml

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1-
machine:
2-
services:
3-
- docker
4-
5-
dependencies:
6-
override:
7-
- docker pull elasticio/appbuilder
8-
9-
test:
10-
override:
11-
- chmod 700 .circleci/build_slug.sh
12-
- .circleci/build_slug.sh
13-
1+
version: 2
2+
jobs:
3+
test:
4+
docker:
5+
- image: circleci/node:8-stretch
6+
steps:
7+
- checkout
8+
- restore_cache:
9+
key: dependency-cache-{{ checksum "package.json" }}
10+
- run:
11+
name: Installing Dependencies
12+
command: npm install
13+
- save_cache:
14+
key: dependency-cache-{{ checksum "package.json" }}
15+
paths:
16+
- node_modules
17+
- run:
18+
name: Running Mocha Tests
19+
command: npm test
20+
workflows:
21+
version: 2
22+
build_and_test:
23+
jobs:
24+
- test

lib/actions/write.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ exports.init = function init(cfg) {
4343
});
4444
};
4545

46-
exports.process = function ProcessAction(msg, cfg) {
46+
exports.process = async function ProcessAction(msg, cfg) {
4747
// eslint-disable-next-line consistent-this
4848
const self = this;
4949

@@ -70,7 +70,7 @@ exports.process = function ProcessAction(msg, cfg) {
7070
rowCount = 0;
7171

7272
console.log('Emitting message %j', messageToEmit);
73-
self.emit('data', messageToEmit);
73+
yield self.emit('data', messageToEmit);
7474

7575
yield exports.init(cfg);
7676
});
@@ -85,5 +85,5 @@ exports.process = function ProcessAction(msg, cfg) {
8585
console.log(`Writing Row: ${JSON.stringify(row)}`);
8686
stringifier.write(row);
8787
rowCount++;
88-
this.emit('end');
88+
await this.emit('end');
8989
};

lib/read.js

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const messages = require('elasticio-node').messages;
66
const moment = require('moment');
77
const debug = require('debug')('csv');
88
const request = require('request');
9+
const { Writable } = require('stream');
910

1011
const formatters = {};
1112

@@ -56,7 +57,7 @@ function createRowMessage(row, columns) {
5657
return messages.newMessageWithBody(body);
5758
}
5859

59-
function ProcessRead(msg, cfg) {
60+
async function ProcessRead(msg, cfg) {
6061
let csvURL = cfg.url;
6162
const that = this;
6263
let index = 0;
@@ -73,49 +74,61 @@ function ProcessRead(msg, cfg) {
7374
} else {
7475
console.error('URL of the CSV is missing');
7576
that.emit('error', 'URL of the CSV is missing');
76-
return that.emit('end');
77+
return await that.emit('end');
7778
}
7879
}
7980
const parser = csv.parse({
8081
delimiter: separator
8182
});
8283

83-
parser.on('readable', function onReadable() {
84-
let record;
85-
// eslint-disable-next-line no-cond-assign
86-
while (record = parser.read()) {
87-
debug('Have got a row #%s', index);
84+
let ended = false;
85+
86+
class CsvWriter extends Writable {
87+
async write(chunk, encoding, callback) {
88+
parser.pause();
89+
debug('Processing %d row...', index);
90+
debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);
91+
8892
if (index >= startRow) {
89-
const msg = createRowMessage(record, cfg.reader.columns);
90-
that.emit('data', msg);
93+
const msg = createRowMessage(chunk, cfg.reader.columns);
94+
await that.emit('data', msg);
9195
} else {
9296
debug('Row #%s is skipped based on configuration', index);
9397
}
9498
index++;
99+
parser.resume();
95100
}
96-
});
97101

98-
parser.on('finish', function fireEnd() {
99-
debug('Number of lines: ' + index);
100-
that.emit('end');
101-
});
102+
async end(chunk, encoding, callback) {
103+
debug('Processing csv writer end event...');
104+
debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);
102105

103-
parser.on('error', function emitError(error) {
104-
debug('Error reported by CSV read', error);
105-
that.emit('error', error);
106-
that.emit('end');
107-
});
106+
debug('Number of lines: ' + index);
107+
await that.emit('end');
108+
ended = true;
109+
}
110+
}
111+
112+
const writer = new CsvWriter();
108113

109114
debug('Sending GET request to url=%s', csvURL);
110115
request.get(csvURL)
111-
.on('response', function onResponse(response) {
116+
.on('response', async function onResponse(response) {
112117
debug('Have got response status=%s headers=%j', response.statusCode, response.headers);
113118
if (response.statusCode !== 200) {
114-
that.emit('error', 'Unexpected response code code=' + response.statusCode);
119+
await that.emit('error', 'Unexpected response code code=' + response.statusCode);
120+
ended = true;
115121
throw Error('Unexpected response code code=' + response.statusCode);
116122
}
117123
})
118-
.pipe(parser);
124+
.pipe(parser).pipe(writer);
125+
126+
// Need to wait for processing all messages
127+
while (!ended) {
128+
await new Promise((resolve, reject) => {
129+
setTimeout(() => { resolve(); }, 0);
130+
});
131+
}
119132
}
120133

121134
exports.process = ProcessRead;

0 commit comments

Comments
 (0)