Skip to content

Commit 0e898d2

Browse files
committed
enhance the example
1 parent 97ec5f5 commit 0e898d2

File tree

1 file changed

+70
-27
lines changed

1 file changed

+70
-27
lines changed

README.md

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,57 +33,100 @@ npm install promise-blocking-queue
3333

3434
## Usage
3535

36-
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB.
37-
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it to much, so we only want to handle
38-
100 concurrent DB insert operations.
39-
We can achieve a short scalable solution like:
36+
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB.
37+
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle
38+
2 concurrent DB insert operations.
39+
40+
We can achieve a short scalable solution like so:
4041

4142
```typescript
4243
import * as JSONStream from 'JSONStream';
4344
import * as fs from 'fs';
44-
import * as _ from 'underscore';
4545
import * as es from 'event-stream';
4646
import { BlockingQueue } from 'promise-blocking-queue';
4747

48-
const queue = new BlockingQueue({ concurrency: 100 });
49-
let count = 0;
48+
const queue = new BlockingQueue({ concurrency: 2 });
49+
let handled = 0;
50+
let failed = 0;
5051

5152
const mapper = (user, cb) => {
52-
queue.enqueue(() => {
53+
console.log('streamed', user.username);
54+
const qResult = queue.enqueue(() => {
55+
console.log('adding', user.username);
5356
// Add user to DB
54-
return Promise.resolve();
55-
}).enqueuePromise
56-
.then(() => {
57-
console.log('handled', count++);
58-
cb();
59-
})
60-
.catch((err) => {
61-
cb(err);
57+
return new Promise((resolve) => {
58+
setTimeout(() => {
59+
console.log('added', user.username);
60+
resolve();
61+
}, (handled + 1) * 100);
62+
}).then(() => {
63+
console.log('handled', ++handled);
6264
});
65+
});
66+
qResult.fnPromise.catch(() => {
67+
console.log('failed', ++failed);
68+
});
69+
// Continue streaming only after current item handling starts
70+
qResult.enqueuePromise.then(cb, cb);
6371
return false;
6472
};
6573

66-
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
67-
const jsonReadStream = JSONStream.parse('*');
68-
const mapStream = es.map(mapper);
69-
70-
readStream
71-
.pipe(jsonReadStream)
72-
.pipe(mapStream)
73-
.on('data', _.noop)
74+
fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' })
75+
.pipe(JSONStream.parse('*'))
76+
.pipe(es.map(mapper))
7477
.on('error', (err) => {
7578
console.log('error streaming', err);
76-
process.exit(1);
7779
})
7880
.on('end', () => {
7981
console.log('done streaming');
8082
queue.on('idle', () => {
81-
console.log('done processing', count);
82-
process.exit(0);
83+
console.log(`done processing - ${handled} handled, ${failed} failed`);
8384
});
8485
});
8586
```
8687

88+
If `users.json` is like:
89+
90+
```json
91+
[
92+
{
93+
"username": "a"
94+
},
95+
{
96+
"username": "b"
97+
},
98+
{
99+
"username": "c"
100+
},
101+
{
102+
"username": "d"
103+
}
104+
]
105+
```
106+
107+
Output will be:
108+
109+
```bash
110+
streamed a
111+
adding a
112+
streamed b
113+
adding b
114+
streamed c // c now waits in line to start and streaming is paused until then
115+
added a
116+
handled 1
117+
streamed d // d only get streamed after c has a spot in the queue
118+
adding c // c only get handled after a is done
119+
added b
120+
handled 2
121+
adding d // d only get handled after b is done
122+
done streaming
123+
added c
124+
handled 3
125+
added d
126+
handled 4
127+
done processing - 4 handled, 0 failed
128+
```
129+
87130
## API
88131

89132
### BlockingQueue(options)

0 commit comments

Comments
 (0)