diff --git a/src/Worker.php b/src/Worker.php index e2696d9..050e0d7 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -42,6 +42,10 @@ class Worker extends \Illuminate\Queue\Worker implements protected $extensions = []; + protected $processed = 0; + + protected $startTime = 0; + public function daemon($connectionName, $queueNames, WorkerOptions $options) { $this->connectionName = $connectionName; @@ -56,6 +60,7 @@ public function daemon($connectionName, $queueNames, WorkerOptions $options) parent::daemon($connectionName, $this->queueNames, $options); return; } + $this->startTime = hrtime(true); $context = $this->queue->getQueueInteropContext(); $queueConsumer = new QueueConsumer($context, new ChainExtension( @@ -63,6 +68,7 @@ public function daemon($connectionName, $queueNames, WorkerOptions $options) )); foreach (explode(',', $queueNames) as $queueName) { $queueConsumer->bindCallback($queueName, function() { + $this->processed++; $this->runJob($this->job, $this->connectionName, $this->options); return Result::ALREADY_ACKNOWLEDGED; @@ -87,6 +93,8 @@ public function runNextJob($connectionName, $queueNames, WorkerOptions $options) return; } + $this->startTime = hrtime(true); + $context = $this->queue->getQueueInteropContext(); $queueConsumer = new QueueConsumer($context, new ChainExtension($this->getAllExtensions([ @@ -96,6 +104,7 @@ public function runNextJob($connectionName, $queueNames, WorkerOptions $options) foreach (explode(',', $queueNames) as $queueName) { $queueConsumer->bindCallback($queueName, function() { + $this->processed++; $this->runJob($this->job, $this->connectionName, $this->options); return Result::ALREADY_ACKNOWLEDGED; @@ -143,7 +152,7 @@ public function onMessageReceived(MessageReceived $context): void public function onPostMessageReceived(PostMessageReceived $context): void { - $this->stopIfNecessary($this->options, $this->lastRestart, $this->job); + $this->stopIfNecessary($this->options, $this->lastRestart, $this->startTime, $this->processed, $this->job); if ($this->stopped) { $context->interruptExecution();