@@ -411,42 +411,71 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
411411
412412 $ process = proc_open ($ command , $ mode , $ pipes );
413413
414+ $ output = '' ;
415+ $ error = '' ;
414416 if (is_resource ($ process )) {
417+ $ data = json_encode (self ::getDataMainThread ());
418+
419+ if (is_string ($ data )) fwrite ($ pipes [0 ], $ data );
420+ fclose ($ pipes [0 ]);
421+
415422 stream_set_blocking ($ pipes [1 ], false );
416423 stream_set_blocking ($ pipes [2 ], false );
417424
418- $ data = json_encode ( self :: getDataMainThread () );
419-
420- if (is_string ( $ data )) {
421- fwrite ( $ pipes [ 0 ], $ data );
422- fclose ( $ pipes [ 0 ]) ;
425+ $ status = proc_get_status ( $ process );
426+ $ pid = $ status [ ' pid ' ];
427+ if (! isset ( self :: $ threads [ $ pid ] )) {
428+ $ this -> setPid ( $ pid );
429+ self :: $ threads [ $ pid ] = $ this ;
423430 }
424431
425- while (proc_get_status ($ process )['running ' ]) {
432+ $ thread = self ::$ threads [$ pid ];
433+ $ thread ->setExitCode ($ status ['exitcode ' ]);
434+ $ thread ->setRunning ($ status ['running ' ]);
435+ $ thread ->setSignaled ($ status ['signaled ' ]);
436+ $ thread ->setStopped ($ status ['stopped ' ]);
437+ while ($ thread ->isRunning ()) {
426438 $ status = proc_get_status ($ process );
427-
428- if (!isset (self ::$ threads [$ status ['pid ' ]])) {
429- $ this ->setPid ($ status ['pid ' ]);
430- self ::$ threads [$ status ['pid ' ]] = $ this ;
431- }
432-
433- $ thread = self ::$ threads [$ status ['pid ' ]];
434-
435439 $ thread ->setExitCode ($ status ['exitcode ' ]);
436440 $ thread ->setRunning ($ status ['running ' ]);
437441 $ thread ->setSignaled ($ status ['signaled ' ]);
438442 $ thread ->setStopped ($ status ['stopped ' ]);
439443
440- if ($ thread ->isStopped ()) {
444+ if ($ thread ->isRunning ()) {
445+ $ read = [$ pipes [1 ], $ pipes [2 ]];
446+ $ write = null ;
447+ $ except = null ;
448+ $ timeout = 0 ;
449+
450+ $ n = stream_select ($ read , $ write , $ except , $ timeout );
451+ if ($ n === false ) break ;
452+ if ($ n > 0 ) {
453+ foreach ($ read as $ stream ) {
454+ if (!feof ($ stream )) {
455+ $ data = stream_get_contents ($ stream , 1024 );
456+ if ($ data === false || $ data === '' ) {
457+ if (feof ($ stream )) continue 2 ;
458+ break ;
459+ }
460+ $ stream === $ pipes [1 ] ? $ output .= $ data : $ error .= $ data ;
461+ }
462+ FiberManager::wait ();
463+ }
464+ }
465+ } elseif ($ thread ->isStopped () || $ thread ->isSignaled ()) {
466+ proc_terminate ($ process );
467+ break ;
468+ } else {
441469 proc_terminate ($ process );
442470 break ;
443471 }
444-
445472 FiberManager::wait ();
446473 }
447474
448- $ output = stream_get_contents ($ pipes [1 ]);
449- $ error = stream_get_contents ($ pipes [2 ]);
475+ $ outputStream = stream_get_contents ($ pipes [1 ]);
476+ $ errorStream = stream_get_contents ($ pipes [2 ]);
477+ $ output .= str_contains ($ output , $ outputStream ) ? '' : $ outputStream ;
478+ $ error .= str_contains ($ error , $ errorStream ) ? '' : $ errorStream ;
450479
451480 fclose ($ pipes [1 ]);
452481 fclose ($ pipes [2 ]);
@@ -455,12 +484,9 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
455484 return $ reject (new ThreadException ($ error ));
456485 } else {
457486 if (!is_bool ($ output )) {
458- $ explode = explode (PHP_EOL , $ output );
459- foreach ($ explode as $ item ) {
460- if ($ item !== '' && self ::isPostMainThread ($ item )) self ::loadSharedData ($ item );
461- elseif ($ item !== '' && self ::isPostThread ($ item )) {
462- $ output = Utils::getStringAfterSign ($ item , self ::POST_THREAD . '=> ' );
463- }
487+ if ($ output !== '' && self ::isPostMainThread ($ output )) self ::loadSharedData ($ output );
488+ elseif ($ output !== '' && self ::isPostThread ($ output )) {
489+ $ output = Utils::getStringAfterSign ($ output , self ::POST_THREAD . '=> ' );
464490 }
465491 }
466492 }
0 commit comments