Skip to content

Commit 2340bad

Browse files
committed
enhance the example with writing results as well
1 parent 18af1e9 commit 2340bad

File tree

1 file changed

+53
-10
lines changed

1 file changed

+53
-10
lines changed

README.md

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,18 @@ const queue = new BlockingQueue({ concurrency: 2 });
5050
let handled = 0;
5151
let failed = 0;
5252

53+
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
54+
const jsonReadStream = JSONStream.parse('*');
55+
const jsonWriteStream = JSONStream.stringify();
56+
const writeStream = fs.createWriteStream('./results.json');
57+
5358
const logFailed = () => {
5459
console.log(`failed ${++failed}`);
5560
};
5661

5762
const logAddedUser = (username) => () => {
5863
console.log(`added ${username} #${++handled}`);
64+
jsonWriteStream.write(username);
5965
};
6066

6167
const addUserToDB = (user) => {
@@ -73,18 +79,41 @@ const mapper = (user, cb) => {
7379
return false;
7480
};
7581

76-
fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' })
77-
.pipe(JSONStream.parse('*'))
82+
// tslint:disable-next-line:no-empty
83+
const noop = () => {};
84+
85+
const onReadEnd = () => {
86+
console.log('done read streaming');
87+
// Wait until all work is done
88+
queue.on('idle', () => {
89+
jsonWriteStream.end();
90+
});
91+
};
92+
93+
const onWriteEnd = () => {
94+
console.log(`done processing - ${handled} handled, ${failed} failed`);
95+
process.exit(0);
96+
};
97+
98+
jsonWriteStream
99+
.pipe(writeStream)
100+
.on('error', (err) => {
101+
console.log('error wrtie streaming', err);
102+
process.exit(1);
103+
})
104+
.on('end', onWriteEnd)
105+
.on('finish', onWriteEnd);
106+
107+
readStream
108+
.pipe(jsonReadStream)
78109
.pipe(es.map(mapper))
110+
.on('data', noop)
79111
.on('error', (err) => {
80-
console.log('error streaming', err);
112+
console.log('error read streaming', err);
113+
process.exit(1);
81114
})
82-
.on('end', () => {
83-
console.log('done streaming');
84-
queue.on('idle', () => {
85-
console.log(`done processing - ${handled} handled, ${failed} failed`);
86-
});
87-
});
115+
.on('finish', onReadEnd)
116+
.on('end', onReadEnd);
88117
```
89118

90119
If `users.json` is like:
@@ -119,12 +148,26 @@ streamed d // d only get streamed after c has a spot in the queue
119148
adding c // c only gets handled after a is done
120149
added b #2
121150
adding d // d only gets handled after b is done
122-
done streaming
151+
done read streaming
123152
added c #3
124153
added d #4
125154
done processing - 4 handled, 0 failed
126155
```
127156

157+
`results.json` will be:
158+
159+
```json
160+
[
161+
"a"
162+
,
163+
"b"
164+
,
165+
"c"
166+
,
167+
"d"
168+
]
169+
```
170+
128171
## API
129172

130173
### BlockingQueue(options)

0 commit comments

Comments
 (0)