33import { EventEmitter } from 'events' ;
44import { StrictEventEmitter } from './EventEmitterTypesHelper' ;
55import { IBlockingQueueOptions , IEnqueueResult , QueueFn } from './types' ;
6+ import * as LinkedList from 'linked-list' ;
7+ import { Item } from 'linked-list' ;
68
79interface IBlockingQueueEvents {
810 idle : void ;
@@ -18,6 +20,12 @@ interface IPromiseParts<T> {
1820 resolve : PromiseResolve < T > ;
1921}
2022
23+ class Node < T , K extends any [ ] > extends Item {
24+ constructor ( public item : IQueueItem < T , K > ) {
25+ super ( ) ;
26+ }
27+ }
28+
2129interface IQueueItem < T , K extends any [ ] > {
2230 enqueueResolve : PromiseResolve < void > ;
2331 fnResolve : PromiseResolve < T > ;
@@ -28,9 +36,10 @@ interface IQueueItem<T, K extends any[]> {
2836export class BlockingQueue extends ( EventEmitter as new ( ) => MessageEmitter ) {
2937
3038 private readonly _options : IBlockingQueueOptions ;
31- private readonly _queue : Array < IQueueItem < any , any > > = [ ] ;
39+ private readonly _queue = new LinkedList < Node < any , any > > ( ) ;
3240 private readonly _boundNext : any ;
3341 private _activeCount : number = 0 ;
42+ private _pendingCount : number = 0 ;
3443
3544 constructor ( options : IBlockingQueueOptions ) {
3645 super ( ) ;
@@ -62,7 +71,8 @@ export class BlockingQueue extends (EventEmitter as new() => MessageEmitter) {
6271 if ( this . activeCount < this . _options . concurrency ) {
6372 this . _run ( item ) ;
6473 } else {
65- this . _queue . push ( item ) ;
74+ this . _queue . append ( new Node ( item ) ) ;
75+ this . _pendingCount ++ ;
6676 }
6777 return {
6878 enqueuePromise : enqueuePromiseParts . promise ,
@@ -75,15 +85,17 @@ export class BlockingQueue extends (EventEmitter as new() => MessageEmitter) {
7585 }
7686
7787 public get pendingCount ( ) : number {
78- return this . _queue . length ;
88+ return this . _pendingCount ;
7989 }
8090
8191 private _next ( ) {
8292 this . _activeCount -- ;
8393
84- const item = this . _queue . shift ( ) ;
85- if ( item ) {
86- this . _run ( item ) ;
94+ const node = this . _queue . head ;
95+ if ( node ) {
96+ node . detach ( ) ;
97+ this . _pendingCount -- ;
98+ this . _run ( node . item ) ;
8799 } else {
88100 this . emit ( 'empty' ) ;
89101 if ( this . _activeCount === 0 ) {
0 commit comments