@@ -21,7 +21,7 @@ If we use [p-queue](https://github.com/sindresorhus/p-limit) (by the amazing [si
2121for example, we can utilize streams to avoid memory bloat, but we have no (easy) way to control
2222the stream flow without hitting that Out Of Memory Exception.
2323
24- The solution - a blocking queue that returns a promise that will be resolved when the added item gain an available slot in the
24+ The solution - a blocking queue that returns a promise that will be resolved when the added item gains an available slot in the
2525queue, thus, allowing us to pause the stream consumption, until there is a _ real_ need to consume the next item - keeping us
2626memory smart while maintaining concurrency level of data handling.
2727
@@ -31,7 +31,7 @@ memory smart while maintaining concurrency level of data handling.
3131npm install promise-blocking-queue
3232```
3333
34- ## Usage
34+ ## Usage example
3535
3636Let's assume we have a very large (a couple of GBs) file called ` users.json ` which contains a long list of users we want to add to our DB.
3737Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle
@@ -43,29 +43,31 @@ We can achieve a short scalable solution like so:
4343import * as JSONStream from ' JSONStream' ;
4444import * as fs from ' fs' ;
4545import * as es from ' event-stream' ;
46+ import * as sleep from ' sleep-promise' ;
4647import { BlockingQueue } from ' promise-blocking-queue' ;
4748
4849const queue = new BlockingQueue ({ concurrency: 2 });
4950let handled = 0 ;
5051let failed = 0 ;
5152
53+ const logFailed = () => {
54+ console .log (` failed ${++ failed } ` );
55+ };
56+
57+ const logAddedUser = (username ) => () => {
58+ console .log (` added ${username } #${++ handled } ` );
59+ };
60+
61+ const addUserToDB = (user ) => {
62+ console .log (` adding ${user .username } ` );
63+ // Simulate long running task
64+ return sleep ((handled + 1 ) * 100 ).then (logAddedUser (user .username ));
65+ };
66+
5267const mapper = (user , cb ) => {
53- console .log (' streamed' , user .username );
54- const qResult = queue .enqueue (() => {
55- console .log (' adding' , user .username );
56- // Add user to DB
57- return new Promise ((resolve ) => {
58- setTimeout (() => {
59- console .log (' added' , user .username );
60- resolve ();
61- }, (handled + 1 ) * 100 );
62- }).then (() => {
63- console .log (' handled' , ++ handled );
64- });
65- });
66- qResult .fnPromise .catch (() => {
67- console .log (' failed' , ++ failed );
68- });
68+ console .log (` streamed ${user .username } ` );
69+ const qResult = queue .enqueue (addUserToDB , user );
70+ qResult .fnPromise .catch (logFailed );
6971 // Continue streaming only after current item handling starts
7072 qResult .enqueuePromise .then (cb , cb );
7173 return false ;
@@ -112,18 +114,14 @@ adding a
112114streamed b
113115adding b
114116streamed c // c now waits in line to start and streaming is paused until then
115- added a
116- handled 1
117+ added a # 1
117118streamed d // d only get streamed after c has a spot in the queue
118- adding c // c only get handled after a is done
119- added b
120- handled 2
121- adding d // d only get handled after b is done
119+ adding c // c only gets handled after a is done
120+ added b # 2
121+ adding d // d only gets handled after b is done
122122done streaming
123- added c
124- handled 3
125- added d
126- handled 4
123+ added c # 3
124+ added d # 4
127125done processing - 4 handled, 0 failed
128126```
129127
0 commit comments