@@ -5,10 +5,21 @@ let REQ_PER_TIMEOUT = 5;
55let TIMEOUT_MILLS = 1000 ;
66const SAFE_TIMEOUT_BUFFER = 100 ;
77
8+ function generateRandomId ( ) {
9+ return Math . random ( ) . toString ( 16 ) . slice ( 2 ) ;
10+ }
11+
812export class BufferedRequest {
913 public static markAsRead = BufferedRequest . create ( ) ;
1014 public static markAsDelivered = BufferedRequest . create ( ) ;
1115
16+ public static updateMarkAsReadOptions ( reqPerTimeout : number , timeoutMills : number ) {
17+ BufferedRequest . markAsRead = BufferedRequest . create ( reqPerTimeout , timeoutMills ) ;
18+ }
19+ public static updateMarkAsDeliveredOptions ( reqPerTimeout : number , timeoutMills : number ) {
20+ BufferedRequest . markAsDelivered = BufferedRequest . create ( reqPerTimeout , timeoutMills ) ;
21+ }
22+
1223 public static get reqPerTimeout ( ) {
1324 return REQ_PER_TIMEOUT ;
1425 }
@@ -28,28 +39,33 @@ export class BufferedRequest {
2839 }
2940
3041 public static create ( reqPerTimeout = REQ_PER_TIMEOUT , timeoutMills = TIMEOUT_MILLS ) {
31- const waitQueue : Func [ ] = [ ] ;
32- const nextQueue : Func [ ] = [ ] ;
42+ const waitQueue = new Map < string , Func > ( ) ;
43+ const nextQueue = new Map < string , Func > ( ) ;
3344
3445 let state : State = 'idle' ;
3546 let timeout : NodeJS . Timeout | undefined ;
3647
3748 return {
38- push ( func : Func ) {
39- waitQueue . push ( func ) ;
49+ push ( func : Func , lane ?: string ) {
50+ waitQueue . set ( lane ?? generateRandomId ( ) , func ) ;
4051 this . invoke ( ) ;
4152 } ,
4253 shift ( ) {
43- if ( nextQueue . length < reqPerTimeout ) {
44- const nextRemains = Math . min ( reqPerTimeout - nextQueue . length , waitQueue . length ) ;
54+ if ( nextQueue . size < reqPerTimeout ) {
55+ const nextRemains = Math . min ( reqPerTimeout - nextQueue . size , waitQueue . size ) ;
56+ const lanes = [ ...waitQueue . keys ( ) ] ;
4557 for ( let n = 0 ; n < nextRemains ; n ++ ) {
46- const func = waitQueue . shift ( ) ;
47- if ( func ) nextQueue . push ( func ) ;
58+ const lane = lanes [ n ] ;
59+ const func = waitQueue . get ( lane ) ;
60+ if ( func ) {
61+ waitQueue . delete ( lane ) ;
62+ nextQueue . set ( lane , func ) ;
63+ }
4864 }
4965 }
5066 } ,
5167 handleIdle ( ) {
52- if ( 0 < nextQueue . length ) {
68+ if ( 0 < nextQueue . size ) {
5369 state = 'processing' ;
5470 this . invoke ( ) ;
5571 }
@@ -59,15 +75,21 @@ export class BufferedRequest {
5975
6076 timeout = setTimeout ( ( ) => {
6177 timeout = undefined ;
62- if ( 0 < nextQueue . length || 0 < waitQueue . length ) {
78+ if ( 0 < nextQueue . size || 0 < waitQueue . size ) {
6379 this . invoke ( ) ;
6480 } else {
6581 state = 'idle' ;
6682 }
6783 } , timeoutMills + SAFE_TIMEOUT_BUFFER ) ;
6884
69- nextQueue . forEach ( ( func ) => func ( ) ) ;
70- nextQueue . length = 0 ;
85+ nextQueue . forEach ( async ( func , lane ) => {
86+ try {
87+ await func ( ) ;
88+ } catch ( e ) {
89+ waitQueue . set ( lane , func ) ;
90+ }
91+ } ) ;
92+ nextQueue . clear ( ) ;
7193 } ,
7294 async invoke ( ) {
7395 this . shift ( ) ;
0 commit comments