9
9
*/
10
10
class Resque_Worker
11
11
{
12
- const LOG_NONE = 0 ;
13
- const LOG_NORMAL = 1 ;
14
- const LOG_VERBOSE = 2 ;
15
-
16
12
/**
17
- * @var int Current log level of this worker.
18
- */
19
- public $ logLevel = 0 ;
13
+ * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
14
+ */
15
+ public $ logger ;
20
16
21
17
/**
22
18
* @var array Array of all associated queues for this worker.
@@ -161,7 +157,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
161
157
$ job = false ;
162
158
if (!$ this ->paused ) {
163
159
if ($ blocking === true ) {
164
- $ this ->log ('Starting blocking with timeout of ' . $ interval, self :: LOG_VERBOSE );
160
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Starting blocking with timeout of {interval} ' , array ( ' interval ' => $ interval) );
165
161
$ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with blocking timeout ' . $ interval );
166
162
} else {
167
163
$ this ->updateProcLine ('Waiting for ' . implode (', ' , $ this ->queues ) . ' with interval ' . $ interval );
@@ -179,7 +175,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
179
175
if ($ blocking === false )
180
176
{
181
177
// If no job was found, we sleep for $interval before continuing and checking again
182
- $ this ->log ('Sleeping for ' . $ interval, self :: LOG_VERBOSE );
178
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Sleeping for {interval} ' , array ( ' interval ' => $ interval) );
183
179
if ($ this ->paused ) {
184
180
$ this ->updateProcLine ('Paused ' );
185
181
}
@@ -193,7 +189,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
193
189
continue ;
194
190
}
195
191
196
- $ this ->log (' got ' . $ job );
192
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , ' Starting work on {job} ' , array ( ' job ' => $ job) );
197
193
Resque_Event::trigger ('beforeFork ' , $ job );
198
194
$ this ->workingOn ($ job );
199
195
@@ -203,7 +199,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
203
199
if ($ this ->child === 0 || $ this ->child === false ) {
204
200
$ status = 'Processing ' . $ job ->queue . ' since ' . strftime ('%F %T ' );
205
201
$ this ->updateProcLine ($ status );
206
- $ this ->log ($ status , self :: LOG_VERBOSE );
202
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , $ status );
207
203
$ this ->perform ($ job );
208
204
if ($ this ->child === 0 ) {
209
205
exit (0 );
@@ -214,7 +210,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
214
210
// Parent process, sit and wait
215
211
$ status = 'Forked ' . $ this ->child . ' at ' . strftime ('%F %T ' );
216
212
$ this ->updateProcLine ($ status );
217
- $ this ->log ($ status , self :: LOG_VERBOSE );
213
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , $ status );
218
214
219
215
// Wait until the child process finishes before continuing
220
216
pcntl_wait ($ status );
@@ -245,13 +241,13 @@ public function perform(Resque_Job $job)
245
241
$ job ->perform ();
246
242
}
247
243
catch (Exception $ e ) {
248
- $ this ->log ($ job . ' failed: ' . $ e ->getMessage ());
244
+ $ this ->logger -> log (Psr \ Log \LogLevel:: CRITICAL , ' { job} has failed {stack} ' , array ( ' job ' => $ job , ' stack ' => $ e ->getMessage () ));
249
245
$ job ->fail ($ e );
250
246
return ;
251
247
}
252
248
253
249
$ job ->updateStatus (Resque_Job_Status::STATUS_COMPLETE );
254
- $ this ->log (' done ' . $ job );
250
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , ' {job} has finished ' , array ( ' job ' => $ job) );
255
251
}
256
252
257
253
/**
@@ -269,15 +265,15 @@ public function reserve($blocking = false, $timeout = null)
269
265
if ($ blocking === true ) {
270
266
$ job = Resque_Job::reserveBlocking ($ queues , $ timeout );
271
267
if ($ job ) {
272
- $ this ->log ('Found job on ' . $ job ->queue , self :: LOG_VERBOSE );
268
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Found job on {queue} ' , array ( ' queue ' => $ job ->queue ) );
273
269
return $ job ;
274
270
}
275
271
} else {
276
272
foreach ($ queues as $ queue ) {
277
- $ this ->log ('Checking ' . $ queue, self :: LOG_VERBOSE );
273
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Checking {queue} for jobs ' , array ( ' queue ' => $ queue) );
278
274
$ job = Resque_Job::reserve ($ queue );
279
275
if ($ job ) {
280
- $ this ->log ('Found job on ' . $ queue, self :: LOG_VERBOSE );
276
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Found job on {queue} ' , array ( ' queue ' => $ job -> queue ) );
281
277
return $ job ;
282
278
}
283
279
}
@@ -355,15 +351,15 @@ private function registerSigHandlers()
355
351
pcntl_signal (SIGUSR2 , array ($ this , 'pauseProcessing ' ));
356
352
pcntl_signal (SIGCONT , array ($ this , 'unPauseProcessing ' ));
357
353
pcntl_signal (SIGPIPE , array ($ this , 'reestablishRedisConnection ' ));
358
- $ this ->log ('Registered signals ' , self :: LOG_VERBOSE );
354
+ $ this ->logger -> log (Psr \ Log \LogLevel:: DEBUG , 'Registered signals ' );
359
355
}
360
356
361
357
/**
362
358
* Signal handler callback for USR2, pauses processing of new jobs.
363
359
*/
364
360
public function pauseProcessing ()
365
361
{
366
- $ this ->log ('USR2 received; pausing job processing ' );
362
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , 'USR2 received; pausing job processing ' );
367
363
$ this ->paused = true ;
368
364
}
369
365
@@ -373,7 +369,7 @@ public function pauseProcessing()
373
369
*/
374
370
public function unPauseProcessing ()
375
371
{
376
- $ this ->log ('CONT received; resuming job processing ' );
372
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , 'CONT received; resuming job processing ' );
377
373
$ this ->paused = false ;
378
374
}
379
375
@@ -383,7 +379,7 @@ public function unPauseProcessing()
383
379
*/
384
380
public function reestablishRedisConnection ()
385
381
{
386
- $ this ->log ('SIGPIPE received; attempting to reconnect ' );
382
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , 'SIGPIPE received; attempting to reconnect ' );
387
383
Resque::redis ()->establishConnection ();
388
384
}
389
385
@@ -394,7 +390,7 @@ public function reestablishRedisConnection()
394
390
public function shutdown ()
395
391
{
396
392
$ this ->shutdown = true ;
397
- $ this ->log (' Exiting... ' );
393
+ $ this ->logger -> log (Psr \ Log \LogLevel:: NOTICE , ' Shutting down ' );
398
394
}
399
395
400
396
/**
@@ -414,18 +410,18 @@ public function shutdownNow()
414
410
public function killChild ()
415
411
{
416
412
if (!$ this ->child ) {
417
- $ this ->log ('No child to kill. ' , self :: LOG_VERBOSE );
413
+ $ this ->logger -> log (Psr \ Log \LogLevel:: DEBUG , 'No child to kill. ' );
418
414
return ;
419
415
}
420
416
421
- $ this ->log ('Killing child at ' . $ this ->child , self :: LOG_VERBOSE );
417
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Killing child at {child} ' , array ( ' child ' => $ this ->child ) );
422
418
if (exec ('ps -o pid,state -p ' . $ this ->child , $ output , $ returnCode ) && $ returnCode != 1 ) {
423
- $ this ->log (' Killing child at ' . $ this ->child , self :: LOG_VERBOSE );
419
+ $ this ->logger -> log (Psr \ Log \LogLevel:: DEBUG , ' Child { child} found, killing. ' , array ( ' child ' => $ this ->child ) );
424
420
posix_kill ($ this ->child , SIGKILL );
425
421
$ this ->child = null ;
426
422
}
427
423
else {
428
- $ this ->log ('Child ' . $ this -> child . ' not found, restarting. ' , self :: LOG_VERBOSE );
424
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Child { child} not found, restarting. ' , array ( ' child ' => $ this -> child ) );
429
425
$ this ->shutdown ();
430
426
}
431
427
}
@@ -448,7 +444,7 @@ public function pruneDeadWorkers()
448
444
if ($ host != $ this ->hostname || in_array ($ pid , $ workerPids ) || $ pid == getmypid ()) {
449
445
continue ;
450
446
}
451
- $ this ->log ('Pruning dead worker: ' . (string )$ worker, self :: LOG_VERBOSE );
447
+ $ this ->logger -> log (Psr \ Log \LogLevel:: INFO , 'Pruning dead worker: {worker} ' , array ( ' worker ' => (string )$ worker) );
452
448
$ worker ->unregisterWorker ();
453
449
}
454
450
}
@@ -536,26 +532,6 @@ public function __toString()
536
532
return $ this ->id ;
537
533
}
538
534
539
- /**
540
- * Output a given log message to STDOUT.
541
- *
542
- * @param string $message Message to output.
543
- * @param int $logLevel The logging level to capture
544
- */
545
- public function log ($ message , $ logLevel = self ::LOG_NORMAL )
546
- {
547
- if ($ logLevel > $ this ->logLevel ) {
548
- return ;
549
- }
550
-
551
- if ($ this ->logLevel == self ::LOG_NORMAL ) {
552
- fwrite (STDOUT , "*** " . $ message . "\n" );
553
- return ;
554
- }
555
-
556
- fwrite (STDOUT , "** [ " . strftime ('%T %Y-%m-%d ' ) . "] " . $ message . "\n" );
557
- }
558
-
559
535
/**
560
536
* Return an object describing the job this worker is currently working on.
561
537
*
@@ -582,5 +558,15 @@ public function getStat($stat)
582
558
{
583
559
return Resque_Stat::get ($ stat . ': ' . $ this );
584
560
}
561
+
562
+ /**
563
+ * Inject the logging object into the worker
564
+ *
565
+ * @param Psr\Log\LoggerInterface $logger
566
+ */
567
+ public function setLogger (Psr \Log \LoggerInterface $ logger )
568
+ {
569
+ $ this ->logger = $ logger ;
570
+ }
585
571
}
586
572
?>
0 commit comments