Skip to content

Commit c805dcb

Browse files
committed
Improve writing logic. Add unit test basement
1 parent 2df4cde commit c805dcb

File tree

5 files changed

+130
-48
lines changed

5 files changed

+130
-48
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ build/Release
2727

2828
# Dependency directory
2929
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
30-
node_modules
30+
node_modules
31+
32+
.env

lib/actions/write.js

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ let rowCount = 0;
1919
let ax;
2020
let putUrl;
2121

22-
const resolveList = [];
23-
2422
exports.init = async function init(cfg) {
2523
const delimiter = cfg.writer.separator || ',';
2624
const header = cfg.includeHeaders !== 'No';
@@ -41,7 +39,7 @@ exports.init = async function init(cfg) {
4139
signedUrl = await client.resources.storage.createSignedUrl();
4240
// signedUrl = {
4341
// put_url: 'https://examlple.mock/putUrl',
44-
// get_url: 'https://examlple.mock/putUrl',
42+
// get_url: 'https://examlple.mock/getUrl',
4543
// };
4644
putUrl = signedUrl.put_url;
4745
console.log('CSV file to be uploaded file to uri=%s', putUrl);
@@ -55,53 +53,45 @@ exports.process = async function ProcessAction(msg, cfg) {
5553

5654
if (timeout) {
5755
clearTimeout(timeout);
58-
resolveList.forEach((res) => {
59-
res();
60-
});
6156
}
6257

63-
const result = new Promise((resolve) => {
64-
resolveList.push(resolve);
58+
timeout = setTimeout(async () => {
59+
console.log('Closing the stream due to inactivity');
6560

66-
timeout = setTimeout(async () => {
67-
console.log('Closing the stream due to inactivity');
68-
69-
const finalRowCount = rowCount;
70-
console.log('The resulting CSV file contains %s rows', finalRowCount);
71-
ax.put(putUrl, stringifier, {
72-
method: 'PUT',
73-
timeout: REQUEST_TIMEOUT,
74-
retry: REQUEST_MAX_RETRY,
75-
delay: REQUEST_RETRY_DELAY,
76-
maxContentLength: REQUEST_MAX_CONTENT_LENGTH,
77-
});
78-
stringifier.end();
79-
const messageToEmit = messages.newMessageWithBody({
80-
rowCount: finalRowCount,
81-
});
82-
const fileName = `${messageToEmit.id}.csv`;
83-
messageToEmit.attachments[fileName] = {
84-
'content-type': 'text/csv',
85-
url: signedUrl.get_url,
86-
};
87-
signedUrl = null;
88-
rowCount = 0;
89-
console.log('Emitting message %j', messageToEmit);
90-
await self.emit('data', messageToEmit);
91-
await exports.init(cfg);
92-
resolve();
93-
}, TIMEOUT_BETWEEN_EVENTS);
61+
const finalRowCount = rowCount;
62+
console.log('The resulting CSV file contains %s rows', finalRowCount);
63+
ax.put(putUrl, stringifier, {
64+
method: 'PUT',
65+
timeout: REQUEST_TIMEOUT,
66+
retry: REQUEST_MAX_RETRY,
67+
delay: REQUEST_RETRY_DELAY,
68+
maxContentLength: REQUEST_MAX_CONTENT_LENGTH,
69+
});
70+
stringifier.end();
71+
const messageToEmit = messages.newMessageWithBody({
72+
rowCount: finalRowCount,
73+
});
74+
const fileName = `${messageToEmit.id}.csv`;
75+
messageToEmit.attachments[fileName] = {
76+
'content-type': 'text/csv',
77+
url: signedUrl.get_url,
78+
};
79+
signedUrl = null;
80+
rowCount = 0;
81+
console.log('Emitting message %j', messageToEmit);
82+
await self.emit('data', messageToEmit);
83+
await exports.init(cfg);
84+
}, TIMEOUT_BETWEEN_EVENTS);
9485

95-
let row = msg.body.writer;
96-
console.log(`Incoming data: ${JSON.stringify(row)}`);
97-
if (cfg.writer.columns) {
98-
const columns = Object.keys(_.keyBy(cfg.writer.columns, 'property'));
99-
row = _.pick(row, columns);
100-
}
101-
console.log(`Writing Row: ${JSON.stringify(row)}`);
102-
stringifier.write(row);
103-
rowCount += 1;
104-
});
86+
let row = msg.body.writer;
87+
console.log(`Incoming data: ${JSON.stringify(row)}`);
88+
if (cfg.writer.columns) {
89+
const columns = Object.keys(_.keyBy(cfg.writer.columns, 'property'));
90+
row = _.pick(row, columns);
91+
}
92+
console.log(`Writing Row: ${JSON.stringify(row)}`);
93+
stringifier.write(row);
94+
rowCount += 1;
10595

106-
await result;
96+
await self.emit('end');
10797
};

package-lock.json

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"devDependencies": {
4747
"chai": "4.1.1",
4848
"chai-as-promised": "7.1.1",
49+
"dotenv": "5.0.0",
4950
"eslint": "5.16.0",
5051
"eslint-config-airbnb-base": "^13.1.0",
5152
"eslint-plugin-import": "^2.17.3",

spec/write.spec.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/* eslint-disable no-unused-vars */
2+
const chai = require('chai');
3+
const chaiAsPromised = require('chai-as-promised');
4+
const fs = require('fs');
5+
const nock = require('nock');
6+
const sinon = require('sinon');
7+
8+
chai.use(chaiAsPromised);
9+
const { expect } = require('chai');
10+
11+
const write = require('../lib/actions/write.js');
12+
13+
describe('CSV Write component', function () {
14+
this.timeout(15000);
15+
16+
let emitter;
17+
let cfg;
18+
19+
beforeEach(() => {
20+
emitter = {
21+
emit: sinon.spy(),
22+
};
23+
});
24+
25+
before(() => {
26+
if (fs.existsSync('.env')) {
27+
// eslint-disable-next-line global-require
28+
require('dotenv').config();
29+
}
30+
31+
cfg = {
32+
writer: {
33+
columns: [
34+
{ property: 'header1' },
35+
{ property: 'header2' },
36+
],
37+
},
38+
};
39+
40+
nock('http://api-service.platform.svc.cluster.local.:9000/', { encodedQueryParams: true })
41+
.post('/v2/resources/storage/signed-url')
42+
.reply(200, { put_url: 'https://examlple.mock/putUrl', get_url: 'https://examlple.mock/getUrl' });
43+
44+
nock('https://examlple.mock')
45+
.put('/putUrl')
46+
.reply(200, {});
47+
});
48+
49+
it('should parse simple string rows', async (done) => {
50+
// await write.init(cfg);
51+
52+
const msg1 = {
53+
body: {
54+
writer: {
55+
columns: [
56+
{ property: 'text11' },
57+
{ property: 'text12' },
58+
],
59+
},
60+
},
61+
};
62+
// await write.process.call(emitter, msg1, cfg);
63+
64+
const msg2 = {
65+
body: {
66+
reader: {
67+
columns: [
68+
{ property: 'text21' },
69+
{ property: 'text22' },
70+
],
71+
},
72+
},
73+
};
74+
// await write.process.call(emitter, msg2, cfg);
75+
76+
// expect(emitter.emit.getCalls().length).to.equal(2);
77+
78+
// await new Promise(resolve => setTimeout(resolve, 12000));
79+
// expect(emitter.emit.getCalls().length).to.equal(3);
80+
81+
done();
82+
});
83+
});

0 commit comments

Comments
 (0)