Skip to content
This repository was archived by the owner on Jun 24, 2024. It is now read-only.

Commit 088bd18

Browse files
Walkerdigitalsadhu
authored andcommitted
feat(reader): Add implementation with tests
1 parent aba27e7 commit 088bd18

File tree

13 files changed

+786
-137
lines changed

13 files changed

+786
-137
lines changed

.eslintignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
coverage

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ tmp/**/*
66
.idea/**/*
77
*.iml
88
*.log
9+
coverage

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ language: node_js
22

33
node_js:
44
- "8"
5-
- "6"
65

76
script:
87
- npm test

lib/reader.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
const { Readable, PassThrough, Transform } = require('stream');
2+
const { stringify, parse } = require('JSONStream');
3+
const mergeStream = require('merge-stream');
4+
const { dedupe, sort, setOrder } = require('./util');
5+
const assert = require('assert');
6+
7+
module.exports = class Reader extends Readable {
8+
constructor(streams) {
9+
super();
10+
11+
assert(
12+
streams,
13+
`Expected first argument to new Reader() to be a stream or array of streams.
14+
Instead got ${typeof stream}`
15+
);
16+
17+
if (!Array.isArray(streams)) {
18+
streams = [streams];
19+
}
20+
21+
assert(
22+
streams.length,
23+
`Expected at least one stream to be provided to new Reader(). Got none.`
24+
);
25+
26+
const merged = mergeStream();
27+
28+
let count = 0;
29+
streams.forEach((readStream, index) => {
30+
const tmpStream = new PassThrough({ objectMode: true });
31+
merged.add(tmpStream);
32+
33+
readStream.on('file found', file => {
34+
this.emit('file found', file);
35+
readStream
36+
.pipe(parse('*'))
37+
.on('error', err => {
38+
this.emit('error', err);
39+
})
40+
.pipe(setOrder(index))
41+
.pipe(tmpStream);
42+
43+
count++;
44+
if (count === streams.length) {
45+
this.emit('pipeline ready');
46+
}
47+
});
48+
49+
readStream.on('file not found', file => {
50+
this.emit('file not found', file);
51+
tmpStream.end();
52+
53+
count++;
54+
if (count === streams.length) {
55+
this.emit('pipeline ready');
56+
}
57+
});
58+
});
59+
60+
this.data = merged.pipe(dedupe()).pipe(sort());
61+
this.data.pause();
62+
63+
this.data.on('data', chunk => {
64+
this.push(chunk.content);
65+
});
66+
67+
this.data.on('end', () => {
68+
this.push(null);
69+
});
70+
}
71+
72+
_read(size) {
73+
return this.data.resume();
74+
}
75+
};

lib/util.js

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
const { Transform } = require('stream');
2+
3+
function compareByOrder(a, b) {
4+
if (a.order === b.order) return 0;
5+
return a.order > b.order ? 1 : -1;
6+
}
7+
8+
class Dedupe extends Transform {
9+
constructor() {
10+
super({
11+
objectMode: true
12+
});
13+
14+
this.rows = new Map();
15+
}
16+
17+
_transform(chunk, enc, callback) {
18+
if (chunk && chunk.id) {
19+
this.rows.set(chunk.id, chunk);
20+
}
21+
22+
callback();
23+
}
24+
25+
_flush(callback) {
26+
Array.from(this.rows.values()).forEach(row => this.push(row));
27+
callback();
28+
}
29+
}
30+
31+
class SetOrder extends Transform {
32+
constructor(index) {
33+
super({
34+
objectMode: true
35+
});
36+
37+
this.index = index;
38+
}
39+
40+
_transform(chunk, enc, callback) {
41+
chunk.order = this.index;
42+
callback(null, chunk);
43+
}
44+
}
45+
46+
class Sort extends Transform {
47+
constructor() {
48+
super({
49+
objectMode: true
50+
});
51+
52+
this.rows = new Map();
53+
}
54+
55+
_transform(chunk, enc, callback) {
56+
if (
57+
chunk &&
58+
chunk.order !== null &&
59+
typeof chunk.order !== 'undefined'
60+
) {
61+
this.rows.set(chunk.order, chunk);
62+
}
63+
callback();
64+
}
65+
66+
_flush(callback) {
67+
Array.from(this.rows.values())
68+
.sort(compareByOrder)
69+
.forEach(row => this.push(row));
70+
callback();
71+
}
72+
}
73+
74+
module.exports = {
75+
dedupe() {
76+
return new Dedupe();
77+
},
78+
setOrder(index) {
79+
return new SetOrder(index);
80+
},
81+
sort() {
82+
return new Sort();
83+
},
84+
compareByOrder
85+
};

0 commit comments

Comments
 (0)