@@ -35,7 +35,7 @@ const {
3535const { exitCodes : { kUnsettledTopLevelAwait } } = internalBinding ( 'errors' ) ;
3636const { URL } = require ( 'internal/url' ) ;
3737const { canParse : URLCanParse } = internalBinding ( 'url' ) ;
38- const { receiveMessageOnPort } = require ( 'worker_threads' ) ;
38+ const { receiveMessageOnPort, isMainThread } = require ( 'worker_threads' ) ;
3939const {
4040 isAnyArrayBuffer,
4141 isArrayBufferView,
@@ -482,6 +482,8 @@ class HooksProxy {
482482 */
483483 #worker;
484484
485+ #portToHooksThread;
486+
485487 /**
486488 * The last notification ID received from the worker. This is used to detect
487489 * if the worker has already sent a notification before putting the main
@@ -499,26 +501,38 @@ class HooksProxy {
499501 #isReady = false ;
500502
501503 constructor ( ) {
502- const { InternalWorker } = require ( 'internal/worker' ) ;
503- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
504-
504+ const { InternalWorker, hooksPort } = require ( 'internal/worker' ) ;
505505 const lock = new SharedArrayBuffer ( SHARED_MEMORY_BYTE_LENGTH ) ;
506506 this . #lock = new Int32Array ( lock ) ;
507507
508- this . #worker = new InternalWorker ( loaderWorkerId , {
509- stderr : false ,
510- stdin : false ,
511- stdout : false ,
512- trackUnmanagedFds : false ,
513- workerData : {
514- lock,
515- } ,
516- } ) ;
517- this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
518- this . #worker. on ( 'exit' , process . exit ) ;
508+ if ( isMainThread ) {
509+ // Main thread is the only one that creates the internal single hooks worker
510+ this . #worker = new InternalWorker ( loaderWorkerId , {
511+ stderr : false ,
512+ stdin : false ,
513+ stdout : false ,
514+ trackUnmanagedFds : false ,
515+ workerData : {
516+ lock,
517+ } ,
518+ } ) ;
519+ this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
520+ this . #worker. on ( 'exit' , process . exit ) ;
521+ this . #portToHooksThread = this . #worker;
522+ } else {
523+ this . #portToHooksThread = hooksPort ;
524+ }
519525 }
520526
521527 waitForWorker ( ) {
528+ // There is one Hooks instance for each worker thread. But only one of these Hooks instances
529+ // has an InternalWorker. That was the Hooks instance created for the main thread.
530+ // It means for all Hooks instances that are not on the main thread => they are ready because they
531+ // delegate to the single InternalWorker anyway.
532+ if ( ! isMainThread ) {
533+ return ;
534+ }
535+
522536 if ( ! this . #isReady) {
523537 const { kIsOnline } = require ( 'internal/worker' ) ;
524538 if ( ! this . #worker[ kIsOnline ] ) {
@@ -535,6 +549,37 @@ class HooksProxy {
535549 }
536550 }
537551
552+ #postMessageToWorker( method , type , transferList , args ) {
553+ this . waitForWorker ( ) ;
554+
555+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
556+
557+ const {
558+ port1 : fromHooksThread ,
559+ port2 : toHooksThread ,
560+ } = new MessageChannel ( ) ;
561+
562+ // Pass work to the worker.
563+ debug ( `post ${ type } message to worker` , { method, args, transferList } ) ;
564+ const usedTransferList = [ toHooksThread ] ;
565+ if ( transferList ) {
566+ ArrayPrototypePushApply ( usedTransferList , transferList ) ;
567+ }
568+
569+ this . #portToHooksThread. postMessage (
570+ {
571+ __proto__ : null ,
572+ args,
573+ lock : this . #lock,
574+ method,
575+ port : toHooksThread ,
576+ } ,
577+ usedTransferList ,
578+ ) ;
579+
580+ return fromHooksThread ;
581+ }
582+
538583 /**
539584 * Invoke a remote method asynchronously.
540585 * @param {string } method Method to invoke
@@ -543,22 +588,7 @@ class HooksProxy {
543588 * @returns {Promise<any> }
544589 */
545590 async makeAsyncRequest ( method , transferList , ...args ) {
546- this . waitForWorker ( ) ;
547-
548- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
549- const asyncCommChannel = new MessageChannel ( ) ;
550-
551- // Pass work to the worker.
552- debug ( 'post async message to worker' , { method, args, transferList } ) ;
553- const finalTransferList = [ asyncCommChannel . port2 ] ;
554- if ( transferList ) {
555- ArrayPrototypePushApply ( finalTransferList , transferList ) ;
556- }
557- this . #worker. postMessage ( {
558- __proto__ : null ,
559- method, args,
560- port : asyncCommChannel . port2 ,
561- } , finalTransferList ) ;
591+ const fromHooksThread = this . #postMessageToWorker( method , 'Async' , transferList , args ) ;
562592
563593 if ( this . #numberOfPendingAsyncResponses++ === 0 ) {
564594 // On the next lines, the main thread will await a response from the worker thread that might
@@ -567,7 +597,11 @@ class HooksProxy {
567597 // However we want to keep the process alive until the worker thread responds (or until the
568598 // event loop of the worker thread is also empty), so we ref the worker until we get all the
569599 // responses back.
570- this . #worker. ref ( ) ;
600+ if ( this . #worker) {
601+ this . #worker. ref ( ) ;
602+ } else {
603+ this . #portToHooksThread. ref ( ) ;
604+ }
571605 }
572606
573607 let response ;
@@ -576,18 +610,26 @@ class HooksProxy {
576610 await AtomicsWaitAsync ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) . value ;
577611 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
578612
579- response = receiveMessageOnPort ( asyncCommChannel . port1 ) ;
613+ response = receiveMessageOnPort ( fromHooksThread ) ;
580614 } while ( response == null ) ;
581615 debug ( 'got async response from worker' , { method, args } , this . #lock) ;
582616
583617 if ( -- this . #numberOfPendingAsyncResponses === 0 ) {
584618 // We got all the responses from the worker, its job is done (until next time).
585- this . #worker. unref ( ) ;
619+ if ( this . #worker) {
620+ this . #worker. unref ( ) ;
621+ } else {
622+ this . #portToHooksThread. unref ( ) ;
623+ }
624+ }
625+
626+ if ( response . message . status === 'exit' ) {
627+ process . exit ( response . message . body ) ;
586628 }
587629
588- const body = this . #unwrapMessage ( response ) ;
589- asyncCommChannel . port1 . close ( ) ;
590- return body ;
630+ fromHooksThread . close ( ) ;
631+
632+ return this . #unwrapMessage ( response ) ;
591633 }
592634
593635 /**
@@ -598,11 +640,7 @@ class HooksProxy {
598640 * @returns {any }
599641 */
600642 makeSyncRequest ( method , transferList , ...args ) {
601- this . waitForWorker ( ) ;
602-
603- // Pass work to the worker.
604- debug ( 'post sync message to worker' , { method, args, transferList } ) ;
605- this . #worker. postMessage ( { __proto__ : null , method, args } , transferList ) ;
643+ const fromHooksThread = this . #postMessageToWorker( method , 'Sync' , transferList , args ) ;
606644
607645 let response ;
608646 do {
@@ -611,14 +649,17 @@ class HooksProxy {
611649 AtomicsWait ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) ;
612650 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
613651
614- response = this . #worker . receiveMessageSync ( ) ;
652+ response = receiveMessageOnPort ( fromHooksThread ) ;
615653 } while ( response == null ) ;
616654 debug ( 'got sync response from worker' , { method, args } ) ;
617655 if ( response . message . status === 'never-settle' ) {
618656 process . exit ( kUnsettledTopLevelAwait ) ;
619657 } else if ( response . message . status === 'exit' ) {
620658 process . exit ( response . message . body ) ;
621659 }
660+
661+ fromHooksThread . close ( ) ;
662+
622663 return this . #unwrapMessage( response ) ;
623664 }
624665
0 commit comments