diff --git a/src/Commands/QueueRunCommand.php b/src/Commands/QueueRunCommand.php index 14bfcc5..237378c 100644 --- a/src/Commands/QueueRunCommand.php +++ b/src/Commands/QueueRunCommand.php @@ -65,6 +65,21 @@ protected function handle(): int $this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s"); try { + $this->worker->setOnJobProcessing(function ($job) { + $jobClass = get_class($job); + $jobId = $job->getJobId() ?? 'N/A'; + $this->info("✔ Processing job [{$jobClass}] (ID: {$jobId})"); + }); + + $this->worker->setOnJobProcessed(function ($job) { + $jobClass = get_class($job); + $jobId = $job->getJobId() ?? 'N/A'; + $this->info("✔ Processed job [{$jobClass}] (ID: {$jobId})"); + + // Flush system output buffer + flush(); + }); + $this->worker->daemon($queue, [ 'sleep' => $sleep, 'maxMemory' => $maxMemory, diff --git a/src/QueueWorker.php b/src/QueueWorker.php index 64ba266..f05d850 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -42,6 +42,21 @@ class QueueWorker */ protected $maxMemory = 128; + /** + * Callback to be executed **before** a job is processed. + * + * @var callable|null + */ + protected $onJobProcessing; + + /** + * Callback to be executed **after** a job has been successfully processed. + * + * + * @var callable|null + */ + protected $onJobProcessed; + /** * Create a new queue worker. * @@ -52,6 +67,28 @@ public function __construct(QueueManager $manager) $this->manager = $manager; } + /** + * Set the callback to be executed **before** a job is processed. + * + * @param callable $callback + * @return void + */ + public function setOnJobProcessing(callable $callback): void + { + $this->onJobProcessing = $callback; + } + + /** + * Set the callback to be executed **after** a job has been processed. + * + * @param callable $callback + * @return void + */ + public function setOnJobProcessed(callable $callback): void + { + $this->onJobProcessed = $callback; + } + /** * Run the worker daemon. * @@ -125,13 +162,20 @@ protected function processJob(QueueJob $queueJob): void $job = $this->manager->unserializeJob($queueJob->payload); $job->attempts = $queueJob->attempts; + if (is_callable($this->onJobProcessing)) { + ($this->onJobProcessing)($job); + } + // Execute the job $this->executeJob($job); // Delete the job from queue if successful $this->manager->delete($queueJob); - $this->logInfo("Job {$job->getJobId()} processed successfully"); + // Trigger onJobProcessed callback + if (is_callable($this->onJobProcessed)) { + ($this->onJobProcessed)($job); + } } catch (\Throwable $e) { $this->handleJobException($queueJob, $job ?? null, $e); }