11import { Worker } from 'worker_threads' ;
22import * as path from 'path' ;
33
4+ type TWorker = Worker & { isIdle : boolean } ;
5+
46/**
57 * Manages a pool of worker threads for parallel processing of image files.
68 */
79export class WorkerPool {
8- private workers : Worker [ ] = [ ] ;
10+ private workers : TWorker [ ] = [ ] ;
911 private taskQueue : { filePath : string ; options : Options } [ ] = [ ] ;
1012 private maxWorkers : number ;
13+ private completePromise ?: Promise < void > ;
14+ private completeResolve ?: ( ) => void ;
15+ private isComplete ( ) : boolean {
16+ return (
17+ this . taskQueue . length === 0 &&
18+ this . workers . every ( ( worker ) => worker . isIdle )
19+ ) ;
20+ }
21+
22+ /**
23+ * Terminate all workers in the pool.
24+ */
25+ public exitAll ( ) : void {
26+ this . workers . forEach ( ( worker ) => worker . terminate ( ) ) ;
27+ this . workers = [ ] ;
28+ }
29+
30+ /**
31+ * Returns a promise that resolves when all work is done.
32+ */
33+ public async allComplete ( ) : Promise < void > {
34+ if ( this . isComplete ( ) ) {
35+ return Promise . resolve ( ) ;
36+ }
37+
38+ if ( ! this . completePromise ) {
39+ this . completePromise = new Promise < void > ( ( resolve ) => {
40+ this . completeResolve = resolve ;
41+ } ) ;
42+ }
43+
44+ return this . completePromise ;
45+ }
1146
1247 /**
1348 * Creates a new WorkerPool instance.
@@ -25,18 +60,22 @@ export class WorkerPool {
2560 * @param options - Image processing options for the file.
2661 */
2762 private createWorker ( filePath : string , options : Options ) : void {
28- const worker = new Worker ( path . join ( __dirname , 'processImage.js' ) , {
29- workerData : { filePath, options } ,
30- } ) ;
63+ const worker = new Worker (
64+ path . join ( __dirname , 'processImage.js' ) ,
65+ ) as TWorker ;
66+
67+ worker . isIdle = false ;
68+ worker . postMessage ( { filePath, options } ) ;
3169
3270 // Listen for messages and errors from the worker
33- worker . on ( 'message' , ( message ) => {
34- console . log ( message ) ;
71+ worker . on ( 'message' , ( ) => {
72+ worker . isIdle = true ;
3573 this . processNextTask ( ) ;
3674 } ) ;
3775
3876 worker . on ( 'error' , ( err ) => {
3977 console . error ( `Error in worker for file ${ filePath } :` , err ) ;
78+ worker . isIdle = true ;
4079 this . processNextTask ( ) ;
4180 } ) ;
4281
@@ -49,7 +88,22 @@ export class WorkerPool {
4988 private processNextTask ( ) : void {
5089 const nextTask = this . taskQueue . shift ( ) ;
5190 if ( nextTask ) {
52- this . createWorker ( nextTask . filePath , nextTask . options ) ;
91+ if ( this . workers . length < this . maxWorkers ) {
92+ this . createWorker ( nextTask . filePath , nextTask . options ) ;
93+ } else {
94+ const worker = this . workers . find ( ( w ) => w . isIdle ) ;
95+ if ( worker ) {
96+ worker . isIdle = false ;
97+ worker . postMessage ( nextTask ) ;
98+ } else {
99+ // Something went wrong, there are no idle workers somehow
100+ throw Error ( 'Could not find an idle worker.' ) ;
101+ }
102+ }
103+ } else if ( this . isComplete ( ) && this . completeResolve ) {
104+ this . completeResolve ( ) ;
105+ this . completePromise = undefined ;
106+ this . completeResolve = undefined ;
53107 }
54108 }
55109
@@ -66,15 +120,4 @@ export class WorkerPool {
66120 this . taskQueue . push ( { filePath, options } ) ;
67121 }
68122 }
69-
70- /**
71- * Waits for all tasks to complete before exiting.
72- */
73- public waitForCompletion ( ) : void {
74- this . workers . forEach ( ( worker ) => {
75- worker . on ( 'exit' , ( ) => {
76- this . processNextTask ( ) ;
77- } ) ;
78- } ) ;
79- }
80123}
0 commit comments