@@ -43,26 +43,31 @@ public function start()
4343 $ this ->setProcName ();
4444
4545 for ($ id = 1 ; $ id <= $ this ->count ; $ id ++) {
46- $ pid = $ this ->fork ();
46+ $ this ->forkWorker ($ id );
47+ }
4748
48- if ($ pid > 0 ) {
49- static ::$ workers [$ id ] = $ pid ;
50- } else {
51- static ::$ isMaster = false ;
52- $ this ->id = $ id ;
53- $ this ->workerPid = posix_getpid ();
49+ $ this ->waitAll ();
50+ }
5451
55- $ this ->setProcName ();
52+ protected function forkWorker ($ id )
53+ {
54+ $ pid = $ this ->fork ();
5655
57- register_shutdown_function ([$ this , 'handleShutdown ' ]);
56+ if ($ pid > 0 ) {
57+ static ::$ workers [$ id ] = $ pid ;
58+ } else {
59+ static ::$ isMaster = false ;
60+ $ this ->id = $ id ;
61+ $ this ->workerPid = posix_getpid ();
5862
59- // Run worker.
60- $ this ->run ();
61- exit (0 );
62- }
63- }
63+ $ this ->setProcName ();
6464
65- $ this ->waitAll ();
65+ register_shutdown_function ([$ this , 'handleShutdown ' ]);
66+
67+ // Run worker.
68+ $ this ->run ();
69+ exit (0 );
70+ }
6671 }
6772
6873 protected function daemonize ()
@@ -123,10 +128,13 @@ protected function waitAll()
123128 while (($ pid = pcntl_wait ($ status , WUNTRACED )) != -1 ) {
124129 if ($ pid > 0 ) {
125130 $ id = array_search ($ pid , static ::$ workers );
126- if ($ id ! == false ) {
127- unset( static :: $ workers [ $ id ]) ;
131+ if ($ id = == false ) {
132+ continue ;
128133 }
129- $ this ->log ("[worker. $ id $ pid] exited with status $ status " );
134+ $ this ->log ("[worker: $ id $ pid] exited with status $ status " );
135+ unset(static ::$ workers [$ id ]);
136+ // refork
137+ $ this ->forkWorker ($ id );
130138 }
131139 }
132140
@@ -150,15 +158,14 @@ protected function run()
150158 // Run job.
151159 call_user_func ($ this ->job , $ this );
152160 } catch (\Throwable $ e ) {
153- $ this ->log ("[worker. {$ this ->id } {$ this ->workerPid }] " . $ e );
154- // rerun
155- $ this ->run ();
161+ $ this ->log ("[worker: {$ this ->id } {$ this ->workerPid }] " . $ e );
162+ exit (250 );
156163 }
157164 }
158165
159166 public function handleShutdown ()
160167 {
161- $ errmsg = "[worker. {$ this ->id } {$ this ->workerPid }] process terminated " ;
168+ $ errmsg = "[worker: {$ this ->id } {$ this ->workerPid }] process terminated " ;
162169 // Handle last error.
163170 $ error = error_get_last ();
164171 if ($ error ) {
@@ -223,7 +230,7 @@ public function signalShutdownHandler($signo)
223230 unset(static ::$ workers [$ id ]);
224231 }
225232 } else {
226- $ this ->log ("[worker. {$ this ->id } {$ this ->workerPid }] " . $ msg );
233+ $ this ->log ("[worker: {$ this ->id } {$ this ->workerPid }] " . $ msg );
227234
228235 // Worker process exit.
229236 exit (0 );
0 commit comments