@@ -86,7 +86,7 @@ public static function exists($workerId)
86
86
*/
87
87
public static function find ($ workerId )
88
88
{
89
- if (!self ::exists ($ workerId ) || false === strpos ($ workerId , ": " )) {
89
+ if (!self ::exists ($ workerId ) || false === strpos ($ workerId , ": " )) {
90
90
return false ;
91
91
}
92
92
@@ -148,83 +148,83 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
148
148
$ this ->updateProcLine ('Starting ' );
149
149
$ this ->startup ();
150
150
151
- while (true ) {
152
- if ($ this ->shutdown ) {
153
- break ;
154
- }
155
-
156
- // Attempt to find and reserve a job
157
- $ job = false ;
158
- if (!$ this ->paused ) {
159
- if ($ blocking === true ) {
160
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Starting blocking with timeout of {interval} ' , array ('interval ' => $ interval ));
161
- $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with blocking timeout ' . $ interval );
162
- } else {
163
- $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with interval ' . $ interval );
164
- }
165
-
166
- $ job = $ this ->reserve ($ blocking , $ interval );
167
- }
168
-
169
- if (!$ job ) {
170
- // For an interval of 0, break now - helps with unit testing etc
171
- if ($ interval == 0 ) {
172
- break ;
173
- }
174
-
175
- if ($ blocking === false )
176
- {
177
- // If no job was found, we sleep for $interval before continuing and checking again
178
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Sleeping for {interval} ' , array ('interval ' => $ interval ));
179
- if ($ this ->paused ) {
180
- $ this ->updateProcLine ('Paused ' );
181
- }
182
- else {
183
- $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ));
184
- }
185
-
186
- usleep ($ interval * 1000000 );
187
- }
188
-
189
- continue ;
190
- }
191
-
192
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'Starting work on {job} ' , array ('job ' => $ job ));
193
- Resque_Event::trigger ('beforeFork ' , $ job );
194
- $ this ->workingOn ($ job );
195
-
196
- $ this ->child = Resque::fork ();
197
-
198
- // Forked and we're the child. Run the job.
199
- if ($ this ->child === 0 || $ this ->child === false ) {
200
- $ status = 'Processing ' . $ job ->queue . ' since ' . strftime ('%F %T ' );
201
- $ this ->updateProcLine ($ status );
202
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , $ status );
203
- $ this ->perform ($ job );
204
- if ($ this ->child === 0 ) {
205
- exit (0 );
206
- }
207
- }
208
-
209
- if ($ this ->child > 0 ) {
210
- // Parent process, sit and wait
211
- $ status = 'Forked ' . $ this ->child . ' at ' . strftime ('%F %T ' );
212
- $ this ->updateProcLine ($ status );
213
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , $ status );
214
-
215
- // Wait until the child process finishes before continuing
216
- pcntl_wait ($ status );
217
- $ exitStatus = pcntl_wexitstatus ($ status );
218
- if ($ exitStatus !== 0 ) {
219
- $ job ->fail (new Resque_Job_DirtyExitException (
220
- 'Job exited with exit code ' . $ exitStatus
221
- ));
222
- }
223
- }
224
-
225
- $ this ->child = null ;
226
- $ this ->doneWorking ();
227
- }
151
+ while (true ) {
152
+ if ($ this ->shutdown ) {
153
+ break ;
154
+ }
155
+
156
+ // Attempt to find and reserve a job
157
+ $ job = false ;
158
+ if (!$ this ->paused ) {
159
+ if ($ blocking === true ) {
160
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Starting blocking with timeout of {interval} ' , array ('interval ' => $ interval ));
161
+ $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with blocking timeout ' . $ interval );
162
+ } else {
163
+ $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with interval ' . $ interval );
164
+ }
165
+
166
+ $ job = $ this ->reserve ($ blocking , $ interval );
167
+ }
168
+
169
+ if (!$ job ) {
170
+ // For an interval of 0, break now - helps with unit testing etc
171
+ if ($ interval == 0 ) {
172
+ break ;
173
+ }
174
+
175
+ if ($ blocking === false )
176
+ {
177
+ // If no job was found, we sleep for $interval before continuing and checking again
178
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Sleeping for {interval} ' , array ('interval ' => $ interval ));
179
+ if ($ this ->paused ) {
180
+ $ this ->updateProcLine ('Paused ' );
181
+ }
182
+ else {
183
+ $ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ));
184
+ }
185
+
186
+ usleep ($ interval * 1000000 );
187
+ }
188
+
189
+ continue ;
190
+ }
191
+
192
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'Starting work on {job} ' , array ('job ' => $ job ));
193
+ Resque_Event::trigger ('beforeFork ' , $ job );
194
+ $ this ->workingOn ($ job );
195
+
196
+ $ this ->child = Resque::fork ();
197
+
198
+ // Forked and we're the child. Run the job.
199
+ if ($ this ->child === 0 || $ this ->child === false ) {
200
+ $ status = 'Processing ' . $ job ->queue . ' since ' . strftime ('%F %T ' );
201
+ $ this ->updateProcLine ($ status );
202
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , $ status );
203
+ $ this ->perform ($ job );
204
+ if ($ this ->child === 0 ) {
205
+ exit (0 );
206
+ }
207
+ }
208
+
209
+ if ($ this ->child > 0 ) {
210
+ // Parent process, sit and wait
211
+ $ status = 'Forked ' . $ this ->child . ' at ' . strftime ('%F %T ' );
212
+ $ this ->updateProcLine ($ status );
213
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , $ status );
214
+
215
+ // Wait until the child process finishes before continuing
216
+ pcntl_wait ($ status );
217
+ $ exitStatus = pcntl_wexitstatus ($ status );
218
+ if ($ exitStatus !== 0 ) {
219
+ $ job ->fail (new Resque_Job_DirtyExitException (
220
+ 'Job exited with exit code ' . $ exitStatus
221
+ ));
222
+ }
223
+ }
224
+
225
+ $ this ->child = null ;
226
+ $ this ->doneWorking ();
227
+ }
228
228
229
229
$ this ->unregisterWorker ();
230
230
}
@@ -241,46 +241,46 @@ public function perform(Resque_Job $job)
241
241
$ job ->perform ();
242
242
}
243
243
catch (Exception $ e ) {
244
- $ this ->logger ->log (Psr \Log \LogLevel::CRITICAL , '{job} has failed {stack} ' , array ('job ' => $ job , 'stack ' => $ e ->getMessage ()));
244
+ $ this ->logger ->log (Psr \Log \LogLevel::CRITICAL , '{job} has failed {stack} ' , array ('job ' => $ job , 'stack ' => $ e ->getMessage ()));
245
245
$ job ->fail ($ e );
246
246
return ;
247
247
}
248
248
249
249
$ job ->updateStatus (Resque_Job_Status::STATUS_COMPLETE );
250
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , '{job} has finished ' , array ('job ' => $ job ));
250
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , '{job} has finished ' , array ('job ' => $ job ));
251
251
}
252
252
253
- /**
254
- * @param bool $blocking
255
- * @param int $timeout
256
- * @return object|boolean Instance of Resque_Job if a job is found, false if not.
257
- */
258
- public function reserve ($ blocking = false , $ timeout = null )
259
- {
260
- $ queues = $ this ->queues ();
261
- if (!is_array ($ queues )) {
262
- return ;
263
- }
264
-
265
- if ($ blocking === true ) {
266
- $ job = Resque_Job::reserveBlocking ($ queues , $ timeout );
267
- if ($ job ) {
268
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Found job on {queue} ' , array ('queue ' => $ job ->queue ));
269
- return $ job ;
270
- }
271
- } else {
272
- foreach ($ queues as $ queue ) {
273
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Checking {queue} for jobs ' , array ('queue ' => $ queue ));
274
- $ job = Resque_Job::reserve ($ queue );
275
- if ($ job ) {
276
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Found job on {queue} ' , array ('queue ' => $ job ->queue ));
277
- return $ job ;
278
- }
279
- }
280
- }
281
-
282
- return false ;
283
- }
253
+ /**
254
+ * @param bool $blocking
255
+ * @param int $timeout
256
+ * @return object|boolean Instance of Resque_Job if a job is found, false if not.
257
+ */
258
+ public function reserve ($ blocking = false , $ timeout = null )
259
+ {
260
+ $ queues = $ this ->queues ();
261
+ if (!is_array ($ queues )) {
262
+ return ;
263
+ }
264
+
265
+ if ($ blocking === true ) {
266
+ $ job = Resque_Job::reserveBlocking ($ queues , $ timeout );
267
+ if ($ job ) {
268
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Found job on {queue} ' , array ('queue ' => $ job ->queue ));
269
+ return $ job ;
270
+ }
271
+ } else {
272
+ foreach ($ queues as $ queue ) {
273
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Checking {queue} for jobs ' , array ('queue ' => $ queue ));
274
+ $ job = Resque_Job::reserve ($ queue );
275
+ if ($ job ) {
276
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Found job on {queue} ' , array ('queue ' => $ job ->queue ));
277
+ return $ job ;
278
+ }
279
+ }
280
+ }
281
+
282
+ return false ;
283
+ }
284
284
285
285
/**
286
286
* Return an array containing all of the queues that this worker should use
@@ -351,15 +351,15 @@ private function registerSigHandlers()
351
351
pcntl_signal (SIGUSR2 , array ($ this , 'pauseProcessing ' ));
352
352
pcntl_signal (SIGCONT , array ($ this , 'unPauseProcessing ' ));
353
353
pcntl_signal (SIGPIPE , array ($ this , 'reestablishRedisConnection ' ));
354
- $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'Registered signals ' );
354
+ $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'Registered signals ' );
355
355
}
356
356
357
357
/**
358
358
* Signal handler callback for USR2, pauses processing of new jobs.
359
359
*/
360
360
public function pauseProcessing ()
361
361
{
362
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'USR2 received; pausing job processing ' );
362
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'USR2 received; pausing job processing ' );
363
363
$ this ->paused = true ;
364
364
}
365
365
@@ -369,7 +369,7 @@ public function pauseProcessing()
369
369
*/
370
370
public function unPauseProcessing ()
371
371
{
372
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'CONT received; resuming job processing ' );
372
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'CONT received; resuming job processing ' );
373
373
$ this ->paused = false ;
374
374
}
375
375
@@ -379,7 +379,7 @@ public function unPauseProcessing()
379
379
*/
380
380
public function reestablishRedisConnection ()
381
381
{
382
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'SIGPIPE received; attempting to reconnect ' );
382
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'SIGPIPE received; attempting to reconnect ' );
383
383
Resque::redis ()->establishConnection ();
384
384
}
385
385
@@ -390,7 +390,7 @@ public function reestablishRedisConnection()
390
390
public function shutdown ()
391
391
{
392
392
$ this ->shutdown = true ;
393
- $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'Shutting down ' );
393
+ $ this ->logger ->log (Psr \Log \LogLevel::NOTICE , 'Shutting down ' );
394
394
}
395
395
396
396
/**
@@ -410,18 +410,18 @@ public function shutdownNow()
410
410
public function killChild ()
411
411
{
412
412
if (!$ this ->child ) {
413
- $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'No child to kill. ' );
413
+ $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'No child to kill. ' );
414
414
return ;
415
415
}
416
416
417
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Killing child at {child} ' , array ('child ' => $ this ->child ));
417
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Killing child at {child} ' , array ('child ' => $ this ->child ));
418
418
if (exec ('ps -o pid,state -p ' . $ this ->child , $ output , $ returnCode ) && $ returnCode != 1 ) {
419
- $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'Child {child} found, killing. ' , array ('child ' => $ this ->child ));
419
+ $ this ->logger ->log (Psr \Log \LogLevel::DEBUG , 'Child {child} found, killing. ' , array ('child ' => $ this ->child ));
420
420
posix_kill ($ this ->child , SIGKILL );
421
421
$ this ->child = null ;
422
422
}
423
423
else {
424
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Child {child} not found, restarting. ' , array ('child ' => $ this ->child ));
424
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Child {child} not found, restarting. ' , array ('child ' => $ this ->child ));
425
425
$ this ->shutdown ();
426
426
}
427
427
}
@@ -439,14 +439,14 @@ public function pruneDeadWorkers()
439
439
$ workerPids = $ this ->workerPids ();
440
440
$ workers = self ::all ();
441
441
foreach ($ workers as $ worker ) {
442
- if (is_object ($ worker )) {
443
- list ($ host , $ pid , $ queues ) = explode (': ' , (string )$ worker , 3 );
444
- if ($ host != $ this ->hostname || in_array ($ pid , $ workerPids ) || $ pid == getmypid ()) {
445
- continue ;
446
- }
447
- $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Pruning dead worker: {worker} ' , array ('worker ' => (string )$ worker ));
448
- $ worker ->unregisterWorker ();
449
- }
442
+ if (is_object ($ worker )) {
443
+ list ($ host , $ pid , $ queues ) = explode (': ' , (string )$ worker , 3 );
444
+ if ($ host != $ this ->hostname || in_array ($ pid , $ workerPids ) || $ pid == getmypid ()) {
445
+ continue ;
446
+ }
447
+ $ this ->logger ->log (Psr \Log \LogLevel::INFO , 'Pruning dead worker: {worker} ' , array ('worker ' => (string )$ worker ));
448
+ $ worker ->unregisterWorker ();
449
+ }
450
450
}
451
451
}
452
452
0 commit comments