@@ -2,6 +2,14 @@ import objectHash from "object-hash";
22
33
44export type Task = ( ) => Promise < any > ;
5+ interface PoolEntry {
6+ key : string ,
7+ data ?: any ,
8+ startedTime : Date ,
9+ completedTime ?: Date ,
10+ failedTime ?: Date ,
11+ error ?: any
12+ }
513
614
715/**
@@ -11,44 +19,118 @@ export type Task = () => Promise<any>;
1119 */
1220export class ProcessPool {
1321
14- private ongoing : string [ ] = [ ] ;
15- private completed : { [ hash : string ] : any } = { } ;
16- private failed : { [ hash : string ] : any } = { } ;
22+ private entries : PoolEntry [ ] = [ ] ;
1723
1824 public scheduleTask ( key : string , task : Task ) : { ticket : string } {
19- if ( this . ongoing . includes ( key ) ) return { ticket : key } ;
20- this . ongoing . push ( key ) ;
25+ if ( this . getOngoing ( ) . find ( e => e . key === key ) ) return { ticket : key } ;
26+ this . addNewEntry ( key ) ;
2127 task ( ) . then ( results => {
22- this . ongoing = this . ongoing . filter ( h => h !== key ) ;
23- this . completed [ key ] = results ;
28+ this . setCompleted ( key , results ) ;
2429 } ) . catch ( error => {
2530 console . error ( `An error occured while trying to execute task ${ key } : ` ) ;
2631 console . error ( error ) ;
27- this . ongoing = this . ongoing . filter ( h => h !== key ) ;
28- this . failed [ key ] = error ;
32+ this . setFailed ( key , error ) ;
2933 } ) ;
3034 return { ticket : key } ;
3135 }
3236
3337 poll ( ticket : string ) : { error : string } | { ticket : string } | { results : any } {
34- const error = this . failed [ ticket ] ;
38+ this . cleanOlderThan ( 24 * 60 * 60 , 3 * 24 * 60 * 60 ) ;
39+
40+ const entry = this . getEntry ( ticket ) ;
41+
42+ const error = entry . error ;
3543 if ( error ) {
36- delete this . failed [ ticket ] ;
3744 return {
3845 // Error objects are not properly stringified by default.
3946 error : JSON . stringify ( error , Object . getOwnPropertyNames ( error ) )
4047 } ;
4148 }
4249
43- const data = this . completed [ ticket ] ;
50+ const data = entry . data ;
4451 if ( data ) {
45- delete this . completed [ ticket ] ;
4652 return { results : data } ;
4753 }
48- if ( ! this . ongoing . includes ( ticket ) ) {
49- return { error : `No such ticket: ${ ticket } ` } ;
50- }
54+
5155 return { ticket : ticket } ;
5256 }
57+
58+ public cleanOlderThan ( maxAgeSeconds : number , abandonedAgeSeconds ?: number ) {
59+ const currentTime = new Date ( ) . getTime ( ) ;
60+ for ( const entry of this . entries ) {
61+
62+ if ( entry . completedTime ) {
63+ const deltaSecs = ( currentTime - entry . completedTime . getTime ( ) ) / 1000 ;
64+ if ( deltaSecs > maxAgeSeconds ) {
65+ console . log ( `Cleaning entry: ${ entry . key } because completed ${ deltaSecs } seconds ago.` ) ;
66+ this . removeEntry ( entry . key ) ;
67+ }
68+ }
69+
70+ if ( entry . failedTime ) {
71+ const deltaSecs = ( currentTime - entry . failedTime . getTime ( ) ) / 1000 ;
72+ if ( deltaSecs > maxAgeSeconds ) {
73+ console . log ( `Cleaning entry: ${ entry . key } because failed ${ deltaSecs } seconds ago.` ) ;
74+ this . removeEntry ( entry . key ) ;
75+ }
76+ }
77+
78+ // Additionally, removing entries that have been started but never finished in, say, a few days?
79+ if ( abandonedAgeSeconds ) {
80+ if ( ! entry . completedTime && ! entry . failedTime ) {
81+ const deltaSecs = ( currentTime - entry . startedTime . getTime ( ) ) / 1000 ;
82+ if ( deltaSecs > abandonedAgeSeconds ) {
83+ console . log ( `Cleaning entry: ${ entry . key } because unfinished since ${ deltaSecs } seconds.` ) ;
84+ this . removeEntry ( entry . key ) ;
85+ }
86+ }
87+ }
88+ }
89+ }
90+
91+ private setCompleted ( key : string , data : any ) : PoolEntry {
92+ const entry = this . getEntry ( key ) ;
93+ entry . completedTime = new Date ( ) ;
94+ entry . data = data ;
95+ return entry ;
96+ }
97+
98+ private setFailed ( key : string , error : any ) : PoolEntry {
99+ const entry = this . getEntry ( key ) ;
100+ entry . error = error ;
101+ entry . failedTime = new Date ( ) ;
102+ return entry ;
103+ }
104+
105+ private addNewEntry ( key : string ) : PoolEntry {
106+ const entry : PoolEntry = {
107+ key : key ,
108+ startedTime : new Date ( ) ,
109+ } ;
110+ this . entries . push ( entry ) ;
111+ return entry ;
112+ }
113+
114+ private removeEntry ( key : string ) {
115+ this . entries = this . entries . filter ( e => e . key !== key ) ;
116+ }
117+
118+ private getEntry ( key : string ) : PoolEntry {
119+ const entry = this . entries . find ( e => e . key === key ) ;
120+ if ( ! entry ) throw new Error ( `No such entry in pool: ${ key } ` ) ;
121+ return entry ;
122+ }
123+
124+ private getOngoing ( ) : PoolEntry [ ] {
125+ return this . entries . filter ( e => ! e . completedTime && ! e . error ) ;
126+ }
127+
128+ private getFailed ( ) : PoolEntry [ ] {
129+ return this . entries . filter ( e => e . error ) ;
130+ }
131+
132+ private getCompleted ( ) : PoolEntry [ ] {
133+ return this . entries . filter ( e => e . completedTime ) ;
134+ }
53135}
54136
0 commit comments